22 Commits

Author SHA1 Message Date
Tim Kuehn
6745cee72c Bump tarpc to v0.18.0 2019-05-11 13:00:35 -07:00
Artem Vorotnikov
31abea18b3 Update to futures-preview 0.3.0-alpha.16 (#230) 2019-05-11 15:18:52 -04:00
Tim Kuehn
593ac135ce Remove stable features from doc examples 2019-04-30 13:18:39 -07:00
Tim Kuehn
05a924d27f Bump tarpc version to 0.17.0 2019-04-30 13:01:45 -07:00
Artem Vorotnikov
af9d71ed0d Bump futures to 0.3.0-alpha.15 (#226) 2019-04-28 20:13:06 -07:00
Tim Kuehn
9b90f6ae51 Bump to v0.16.0 2019-04-16 10:46:53 -07:00
Tim
bbfc8ac352 Merge pull request #216 from vorot93/futures-master
* Use upstream sink compat shims
* Port to new Sink trait introduced in e101c891f04aba34ee29c6a8cd8321563c7e0161
* rustfmt
* Port to std::task::Context
* Add Google license header to bincode-transport/src/compat.rs
* Remove compat for it is no longer needed
* future::join as freestanding function
* Simplify dependencies
* Depend on futures-preview 0.3.0-alpha.14
* Fix infinite recursion
2019-04-16 08:43:10 -07:00
Tim
ad86a967ba Fix infinite recursion 2019-04-16 18:27:42 +03:00
Artem Vorotnikov
58a0eced19 Depend on futures-preview 0.3.0-alpha.14 2019-04-15 21:16:20 +03:00
Artem Vorotnikov
46fffd13e7 Simplify dependencies 2019-04-15 21:14:25 +03:00
Artem Vorotnikov
6c8d4be462 future::join as freestanding function 2019-04-15 20:30:04 +03:00
Artem Vorotnikov
e3a517bf0d Remove compat and transmute for they are no longer needed 2019-04-15 20:24:09 +03:00
Artem Vorotnikov
f4e22bdc2e Port to std::task::Context 2019-04-15 20:22:15 +03:00
Artem Vorotnikov
46f56fbdc0 Add Google license header to bincode-transport/src/compat.rs 2019-04-15 20:22:15 +03:00
Artem Vorotnikov
8665655592 Fix test client breakage by 9100ea46f997f24d4bc8c1764d0fe3ff8226ad2a 2019-04-15 20:22:15 +03:00
Artem Vorotnikov
4569d26d81 rustfmt 2019-04-15 20:22:15 +03:00
Artem Vorotnikov
b8b92ddb5f Workaround for stack overflow caused by 2a95710db0e2d85094938776ebb4f270bc389c41 2019-04-15 20:16:48 +03:00
Artem Vorotnikov
8dd3390876 Port to new Sink trait introduced in e101c891f04aba34ee29c6a8cd8321563c7e0161 2019-04-15 20:16:48 +03:00
Artem Vorotnikov
06c420b60c Use upstream sink compat shims 2019-04-15 20:16:48 +03:00
Artem Vorotnikov
a7fb4d22cc Switch to master branch of futures-preview 2019-04-15 20:16:48 +03:00
Tim
b1cd5f34e5 Don't panic in pump_write when a client is dropped and there are more calls to poll. (#221)
This can happen in cases where a response is being read and the client isn't around.

Fixes #220
2019-04-15 09:42:53 -07:00
Artem Vorotnikov
088e5f8f2c Remove deprecated feature from bincode dependency (#218) 2019-04-04 10:34:11 -07:00
28 changed files with 369 additions and 490 deletions

View File

@@ -33,7 +33,7 @@ arguments to tarpc fns.
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.15.0" tarpc = "0.18.0"
``` ```
The `service!` macro expands to a collection of items that form an The `service!` macro expands to a collection of items that form an
@@ -48,7 +48,7 @@ races!
Here's a small service. Here's a small service.
```rust ```rust
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)] #![feature(arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
use futures::{ use futures::{

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc-bincode-transport" name = "tarpc-bincode-transport"
version = "0.4.0" version = "0.7.0"
authors = ["Tim Kuehn <tikue@google.com>"] authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018' edition = '2018'
license = "MIT" license = "MIT"
@@ -13,20 +13,17 @@ readme = "../README.md"
description = "A bincode-based transport for tarpc services." description = "A bincode-based transport for tarpc services."
[dependencies] [dependencies]
bincode = { version = "1.0", features = ["i128"] } bincode = "1"
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
futures_legacy = { version = "0.1", package = "futures" } futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.4" pin-utils = "0.1.0-alpha.4"
rpc = { package = "tarpc-lib", version = "0.3", path = "../rpc", features = ["serde1"] } rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] }
serde = "1.0" serde = "1.0"
tokio-io = "0.1" tokio-io = "0.1"
async-bincode = "0.4" async-bincode = "0.4"
tokio-tcp = "0.1" tokio-tcp = "0.1"
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
[dev-dependencies] [dev-dependencies]
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
env_logger = "0.6" env_logger = "0.6"
humantime = "1.0" humantime = "1.0"
libtest = "0.0.1" libtest = "0.0.1"

View File

@@ -1,150 +0,0 @@
use futures::{compat::Stream01CompatExt, prelude::*, ready};
use futures_legacy::{
executor::{
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
UnsafeNotify as UnsafeNotify01,
},
Async as Async01, AsyncSink as AsyncSink01, Sink as Sink01, Stream as Stream01,
};
use std::{
pin::Pin,
task::{self, Poll, Waker},
};
/// A shim to convert a 0.1 Sink + Stream to a 0.3 Sink + Stream.
#[derive(Debug)]
pub struct Compat<S, SinkItem> {
staged_item: Option<SinkItem>,
inner: S,
}
impl<S, SinkItem> Compat<S, SinkItem> {
/// Returns a new Compat.
pub fn new(inner: S) -> Self {
Compat {
inner,
staged_item: None,
}
}
/// Unwraps Compat, returning the inner value.
pub fn into_inner(self) -> S {
self.inner
}
/// Returns a reference to the value wrapped by Compat.
pub fn get_ref(&self) -> &S {
&self.inner
}
}
impl<S, SinkItem> Stream for Compat<S, SinkItem>
where
S: Stream01,
{
type Item = Result<S::Item, S::Error>;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
unsafe {
let inner = &mut Pin::get_unchecked_mut(self).inner;
let mut compat = inner.compat();
let compat = Pin::new_unchecked(&mut compat);
match ready!(compat.poll_next(waker)) {
None => Poll::Ready(None),
Some(Ok(next)) => Poll::Ready(Some(Ok(next))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
}
}
}
}
impl<S, SinkItem> Sink for Compat<S, SinkItem>
where
S: Sink01<SinkItem = SinkItem>,
{
type SinkItem = SinkItem;
type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), S::SinkError> {
let me = unsafe { Pin::get_unchecked_mut(self) };
assert!(me.staged_item.is_none());
me.staged_item = Some(item);
Ok(())
}
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.staged_item.take() {
Some(staged_item) => match me.inner.start_send(staged_item) {
Ok(AsyncSink01::Ready) => Poll::Ready(Ok(())),
Ok(AsyncSink01::NotReady(item)) => {
me.staged_item = Some(item);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
},
None => Poll::Ready(Ok(())),
}
})
}
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.poll_complete() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> {
let notify = &WakerToHandle(waker);
executor01::with_notify(notify, 0, move || {
let me = unsafe { Pin::get_unchecked_mut(self) };
match me.inner.close() {
Ok(Async01::Ready(())) => Poll::Ready(Ok(())),
Ok(Async01::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
}
#[derive(Clone, Debug)]
struct WakerToHandle<'a>(&'a Waker);
#[derive(Debug)]
struct NotifyWaker(task::Waker);
impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(self.0.clone()));
NotifyHandle01::new(Box::into_raw(ptr))
}
unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
unsafe { NotifyWaker(handle.0.clone()).clone_raw() }
}
}

View File

@@ -6,16 +6,11 @@
//! A TCP [`Transport`] that serializes as bincode. //! A TCP [`Transport`] that serializes as bincode.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)] #![feature(arbitrary_self_types, async_await)]
#![deny(missing_docs, missing_debug_implementations)] #![deny(missing_docs, missing_debug_implementations)]
use self::compat::Compat;
use async_bincode::{AsyncBincodeStream, AsyncDestination}; use async_bincode::{AsyncBincodeStream, AsyncDestination};
use futures::{ use futures::{compat::*, prelude::*, ready};
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt},
prelude::*,
ready,
};
use pin_utils::unsafe_pinned; use pin_utils::unsafe_pinned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
@@ -24,29 +19,20 @@ use std::{
marker::PhantomData, marker::PhantomData,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
task::{Poll, Waker}, task::{Context, Poll},
}; };
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpListener, TcpStream}; use tokio_tcp::{TcpListener, TcpStream};
mod compat;
/// A transport that serializes to, and deserializes from, a [`TcpStream`]. /// A transport that serializes to, and deserializes from, a [`TcpStream`].
#[derive(Debug)] #[derive(Debug)]
pub struct Transport<S, Item, SinkItem> { pub struct Transport<S, Item, SinkItem> {
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>, inner: Compat01As03Sink<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>,
}
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
/// Returns the transport underlying the bincode transport.
pub fn into_inner(self) -> S {
self.inner.into_inner().into_inner()
}
} }
impl<S, Item, SinkItem> Transport<S, Item, SinkItem> { impl<S, Item, SinkItem> Transport<S, Item, SinkItem> {
unsafe_pinned!( unsafe_pinned!(
inner: Compat<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem> inner: Compat01As03Sink<AsyncBincodeStream<S, Item, SinkItem, AsyncDestination>, SinkItem>
); );
} }
@@ -57,8 +43,8 @@ where
{ {
type Item = io::Result<Item>; type Item = io::Result<Item>;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<io::Result<Item>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Item>>> {
match self.inner().poll_next(waker) { match self.inner().poll_next(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))), Poll::Ready(Some(Ok(next))) => Poll::Ready(Some(Ok(next))),
@@ -69,12 +55,11 @@ where
} }
} }
impl<S, Item, SinkItem> Sink for Transport<S, Item, SinkItem> impl<S, Item, SinkItem> Sink<SinkItem> for Transport<S, Item, SinkItem>
where where
S: AsyncWrite, S: AsyncWrite,
SinkItem: Serialize, SinkItem: Serialize,
{ {
type SinkItem = SinkItem;
type SinkError = io::Error; type SinkError = io::Error;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
@@ -83,16 +68,16 @@ where
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
} }
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
convert(self.inner().poll_ready(waker)) convert(self.inner().poll_ready(cx))
} }
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
convert(self.inner().poll_flush(waker)) convert(self.inner().poll_flush(cx))
} }
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
convert(self.inner().poll_close(waker)) convert(self.inner().poll_close(cx))
} }
} }
@@ -133,7 +118,7 @@ where
impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> { impl<S, Item, SinkItem> From<S> for Transport<S, Item, SinkItem> {
fn from(inner: S) -> Self { fn from(inner: S) -> Self {
Transport { Transport {
inner: Compat::new(AsyncBincodeStream::from(inner).for_async()), inner: Compat01As03Sink::new(AsyncBincodeStream::from(inner).for_async()),
} }
} }
} }
@@ -146,7 +131,7 @@ where
Item: for<'de> Deserialize<'de>, Item: for<'de> Deserialize<'de>,
SinkItem: Serialize, SinkItem: Serialize,
{ {
Ok(new(await!(TcpStream::connect(addr).compat())?)) Ok(new(TcpStream::connect(addr).compat().await?))
} }
/// Listens on `addr`, wrapping accepted connections in bincode transports. /// Listens on `addr`, wrapping accepted connections in bincode transports.
@@ -189,8 +174,8 @@ where
{ {
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>; type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = ready!(self.incoming().poll_next(waker)?); let next = ready!(self.incoming().poll_next(cx)?);
Poll::Ready(next.map(|conn| Ok(new(conn)))) Poll::Ready(next.map(|conn| Ok(new(conn))))
} }
} }

View File

@@ -6,14 +6,7 @@
//! Tests client/server control flow. //! Tests client/server control flow.
#![feature( #![feature(test, integer_atomics, async_await)]
test,
integer_atomics,
futures_api,
generators,
await_macro,
async_await
)]
use futures::{compat::Executor01CompatExt, prelude::*}; use futures::{compat::Executor01CompatExt, prelude::*};
use libtest::stats::Stats; use libtest::stats::Stats;
@@ -40,8 +33,8 @@ async fn bench() -> io::Result<()> {
.compat(), .compat(),
); );
let conn = await!(tarpc_bincode_transport::connect(&addr))?; let conn = tarpc_bincode_transport::connect(&addr).await?;
let client = &mut await!(client::new::<u32, u32, _>(client::Config::default(), conn))?; let client = &mut client::new::<u32, u32, _>(client::Config::default(), conn).await?;
let total = 10_000usize; let total = 10_000usize;
let mut successful = 0u32; let mut successful = 0u32;
@@ -49,7 +42,7 @@ async fn bench() -> io::Result<()> {
let mut durations = vec![]; let mut durations = vec![];
for _ in 1..=total { for _ in 1..=total {
let now = Instant::now(); let now = Instant::now();
let response = await!(client.call(context::current(), 0u32)); let response = client.call(context::current(), 0u32).await;
let elapsed = now.elapsed(); let elapsed = now.elapsed();
match response { match response {

View File

@@ -6,12 +6,12 @@
//! Tests client/server control flow. //! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api)] #![feature(async_await)]
use futures::{ use futures::{
compat::{Executor01CompatExt, Future01CompatExt}, compat::{Executor01CompatExt, Future01CompatExt},
prelude::*, prelude::*,
stream, stream::FuturesUnordered,
}; };
use log::{info, trace}; use log::{info, trace};
use rand::distributions::{Distribution, Normal}; use rand::distributions::{Distribution, Normal};
@@ -66,7 +66,7 @@ async fn run() -> io::Result<()> {
let wait = Delay::new(Instant::now() + delay).compat(); let wait = Delay::new(Instant::now() + delay).compat();
async move { async move {
await!(wait).unwrap(); wait.await.unwrap();
Ok(request) Ok(request)
} }
}); });
@@ -75,11 +75,8 @@ async fn run() -> io::Result<()> {
tokio_executor::spawn(server.unit_error().boxed().compat()); tokio_executor::spawn(server.unit_error().boxed().compat());
let conn = await!(tarpc_bincode_transport::connect(&addr))?; let conn = tarpc_bincode_transport::connect(&addr).await?;
let client = await!(client::new::<String, String, _>( let client = client::new::<String, String, _>(client::Config::default(), conn).await?;
client::Config::default(),
conn
))?;
// Proxy service // Proxy service
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
@@ -99,7 +96,7 @@ async fn run() -> io::Result<()> {
let handler = channel.respond_with(move |ctx, request| { let handler = channel.respond_with(move |ctx, request| {
trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr); trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr);
let mut client = client.clone(); let mut client = client.clone();
async move { await!(client.call(ctx, request)) } async move { client.call(ctx, request).await }
}); });
tokio_executor::spawn(handler.unit_error().boxed().compat()); tokio_executor::spawn(handler.unit_error().boxed().compat());
} }
@@ -111,10 +108,9 @@ async fn run() -> io::Result<()> {
config.max_in_flight_requests = 10; config.max_in_flight_requests = 10;
config.pending_request_buffer = 10; config.pending_request_buffer = 10;
let client = await!(client::new::<String, String, _>( let client =
config, client::new::<String, String, _>(config, tarpc_bincode_transport::connect(&addr).await?)
await!(tarpc_bincode_transport::connect(&addr))? .await?;
))?;
// Make 3 speculative requests, returning only the quickest. // Make 3 speculative requests, returning only the quickest.
let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect(); let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect();
@@ -126,7 +122,11 @@ async fn run() -> io::Result<()> {
let response = client.call(ctx, "ping".into()); let response = client.call(ctx, "ping".into());
requests.push(response.map(move |r| (trace_id, r))); requests.push(response.map(move |r| (trace_id, r)));
} }
let (fastest_response, _) = await!(stream::futures_unordered(requests).into_future()); let (fastest_response, _) = requests
.into_iter()
.collect::<FuturesUnordered<_>>()
.into_future()
.await;
let (trace_id, resp) = fastest_response.unwrap(); let (trace_id, resp) = fastest_response.unwrap();
info!("[{}] fastest_response = {:?}", trace_id, resp); info!("[{}] fastest_response = {:?}", trace_id, resp);

View File

@@ -6,7 +6,7 @@
//! Tests client/server control flow. //! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api)] #![feature(async_await)]
use futures::{ use futures::{
compat::{Executor01CompatExt, Future01CompatExt}, compat::{Executor01CompatExt, Future01CompatExt},
@@ -65,7 +65,7 @@ async fn run() -> io::Result<()> {
let sleep = Delay::new(Instant::now() + delay).compat(); let sleep = Delay::new(Instant::now() + delay).compat();
async { async {
await!(sleep).unwrap(); sleep.await.unwrap();
Ok(request) Ok(request)
} }
}); });
@@ -78,8 +78,8 @@ async fn run() -> io::Result<()> {
config.max_in_flight_requests = 10; config.max_in_flight_requests = 10;
config.pending_request_buffer = 10; config.pending_request_buffer = 10;
let conn = await!(tarpc_bincode_transport::connect(&addr))?; let conn = tarpc_bincode_transport::connect(&addr).await?;
let client = await!(client::new::<String, String, _>(config, conn))?; let client = client::new::<String, String, _>(config, conn).await?;
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>(); let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients { for mut client in clients {
@@ -88,7 +88,7 @@ async fn run() -> io::Result<()> {
async move { async move {
let trace_id = *ctx.trace_id(); let trace_id = *ctx.trace_id();
let response = client.call(ctx, "ping".into()); let response = client.call(ctx, "ping".into());
match await!(response) { match response.await {
Ok(response) => info!("[{}] response: {}", trace_id, response), Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e), Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
} }

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc-example-service" name = "tarpc-example-service"
version = "0.3.0" version = "0.6.0"
authors = ["Tim Kuehn <tikue@google.com>"] authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018" edition = "2018"
license = "MIT" license = "MIT"
@@ -13,11 +13,11 @@ readme = "../README.md"
description = "An example server built on tarpc." description = "An example server built on tarpc."
[dependencies] [dependencies]
bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" } bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
clap = "2.0" clap = "2.0"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
serde = { version = "1.0" } serde = { version = "1.0" }
tarpc = { version = "0.15", path = "../tarpc", features = ["serde1"] } tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] }
tokio = "0.1" tokio = "0.1"
tokio-executor = "0.1" tokio-executor = "0.1"

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)] #![feature(arbitrary_self_types, async_await)]
use clap::{App, Arg}; use clap::{App, Arg};
use futures::{compat::Executor01CompatExt, prelude::*}; use futures::{compat::Executor01CompatExt, prelude::*};
@@ -12,17 +12,17 @@ use std::{io, net::SocketAddr};
use tarpc::{client, context}; use tarpc::{client, context};
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> { async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
let transport = await!(bincode_transport::connect(&server_addr))?; let transport = bincode_transport::connect(&server_addr).await?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any // new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro. // Transport as input, and returns a Client, also generated by the macro.
// by the service mcro. // by the service mcro.
let mut client = await!(service::new_stub(client::Config::default(), transport))?; let mut client = service::new_stub(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in service!. It takes the same args // The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context // as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests. // specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), name))?; let hello = client.hello(context::current(), name).await?;
println!("{}", hello); println!("{}", hello);

View File

@@ -4,13 +4,7 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
#![feature( #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
futures_api,
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene
)]
// This is the service definition. It looks a lot like a trait definition. // This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String. // It defines one RPC, hello, which takes one arg, name, and returns a String.

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)] #![feature(arbitrary_self_types, async_await)]
use clap::{App, Arg}; use clap::{App, Arg};
use futures::{ use futures::{
@@ -47,7 +47,7 @@ async fn run(server_addr: SocketAddr) -> io::Result<()> {
// the generated Service trait. // the generated Service trait.
.respond_with(service::serve(HelloServer)); .respond_with(service::serve(HelloServer));
await!(server); server.await;
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc-lib" name = "tarpc-lib"
version = "0.3.0" version = "0.6.0"
authors = ["Tim Kuehn <tikue@google.com>"] authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018' edition = '2018'
license = "MIT" license = "MIT"
@@ -18,6 +18,7 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
[dependencies] [dependencies]
fnv = "1.0" fnv = "1.0"
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
humantime = "1.0" humantime = "1.0"
log = "0.4" log = "0.4"
pin-utils = "0.1.0-alpha.4" pin-utils = "0.1.0-alpha.4"
@@ -26,11 +27,7 @@ tokio-timer = "0.2"
trace = { package = "tarpc-trace", version = "0.2", path = "../trace" } trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
serde = { optional = true, version = "1.0" } serde = { optional = true, version = "1.0" }
[target.'cfg(not(test))'.dependencies]
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] }
[dev-dependencies] [dev-dependencies]
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } futures-test-preview = { version = "0.3.0-alpha.16" }
futures-test-preview = { version = "0.3.0-alpha.13" }
env_logger = "0.6" env_logger = "0.6"
tokio = "0.1" tokio = "0.1"

View File

@@ -15,7 +15,7 @@ use futures::{
prelude::*, prelude::*,
ready, ready,
stream::Fuse, stream::Fuse,
task::Waker, task::Context,
Poll, Poll,
}; };
use humantime::format_rfc3339; use humantime::format_rfc3339;
@@ -65,14 +65,19 @@ struct Send<'a, Req, Resp> {
fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>, fut: MapOkDispatchResponse<SendMapErrConnectionReset<'a, Req, Resp>, Resp>,
} }
type SendMapErrConnectionReset<'a, Req, Resp> = type SendMapErrConnectionReset<'a, Req, Resp> = MapErrConnectionReset<
MapErrConnectionReset<futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>>; futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>, DispatchRequest<Req, Resp>>,
>;
impl<'a, Req, Resp> Send<'a, Req, Resp> { impl<'a, Req, Resp> Send<'a, Req, Resp> {
unsafe_pinned!( unsafe_pinned!(
fut: MapOkDispatchResponse< fut: MapOkDispatchResponse<
MapErrConnectionReset< MapErrConnectionReset<
futures::sink::Send<'a, mpsc::Sender<DispatchRequest<Req, Resp>>>, futures::sink::Send<
'a,
mpsc::Sender<DispatchRequest<Req, Resp>>,
DispatchRequest<Req, Resp>,
>,
>, >,
Resp, Resp,
> >
@@ -82,8 +87,8 @@ impl<'a, Req, Resp> Send<'a, Req, Resp> {
impl<'a, Req, Resp> Future for Send<'a, Req, Resp> { impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
type Output = io::Result<DispatchResponse<Resp>>; type Output = io::Result<DispatchResponse<Resp>>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().fut().poll(waker) self.as_mut().fut().poll(cx)
} }
} }
@@ -101,8 +106,8 @@ impl<'a, Req, Resp> Call<'a, Req, Resp> {
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
type Output = io::Result<Resp>; type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().fut().poll(waker) self.as_mut().fut().poll(cx)
} }
} }
@@ -177,8 +182,8 @@ impl<Resp> DispatchResponse<Resp> {
impl<Resp> Future for DispatchResponse<Resp> { impl<Resp> Future for DispatchResponse<Resp> {
type Output = io::Result<Resp>; type Output = io::Result<Resp>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<Resp>> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Resp>> {
let resp = ready!(self.response.poll_unpin(waker)); let resp = ready!(self.response.poll_unpin(cx));
self.complete = true; self.complete = true;
@@ -258,6 +263,7 @@ where
{ {
let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer); let (to_dispatch, pending_requests) = mpsc::channel(config.pending_request_buffer);
let (cancellation, canceled_requests) = cancellations(); let (cancellation, canceled_requests) = cancellations();
let canceled_requests = canceled_requests.fuse();
crate::spawn( crate::spawn(
RequestDispatch { RequestDispatch {
@@ -296,7 +302,7 @@ struct RequestDispatch<Req, Resp, C> {
/// Requests waiting to be written to the wire. /// Requests waiting to be written to the wire.
pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>, pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>,
/// Requests that were dropped. /// Requests that were dropped.
canceled_requests: CanceledRequests, canceled_requests: Fuse<CanceledRequests>,
/// Requests already written to the wire that haven't yet received responses. /// Requests already written to the wire that haven't yet received responses.
in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>, in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>,
/// Configures limits to prevent unlimited resource usage. /// Configures limits to prevent unlimited resource usage.
@@ -313,12 +319,12 @@ where
{ {
unsafe_pinned!(server_addr: SocketAddr); unsafe_pinned!(server_addr: SocketAddr);
unsafe_pinned!(in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>); unsafe_pinned!(in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>);
unsafe_pinned!(canceled_requests: CanceledRequests); unsafe_pinned!(canceled_requests: Fuse<CanceledRequests>);
unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>); unsafe_pinned!(pending_requests: Fuse<mpsc::Receiver<DispatchRequest<Req, Resp>>>);
unsafe_pinned!(transport: Fuse<C>); unsafe_pinned!(transport: Fuse<C>);
fn pump_read(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { fn pump_read(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
Poll::Ready(match ready!(self.as_mut().transport().poll_next(waker)?) { Poll::Ready(match ready!(self.as_mut().transport().poll_next(cx)?) {
Some(response) => { Some(response) => {
self.complete(response); self.complete(response);
Some(Ok(())) Some(Ok(()))
@@ -330,13 +336,13 @@ where
}) })
} }
fn pump_write(self: &mut Pin<&mut Self>, waker: &Waker) -> PollIo<()> { fn pump_write(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
enum ReceiverStatus { enum ReceiverStatus {
NotReady, NotReady,
Closed, Closed,
} }
let pending_requests_status = match self.poll_next_request(waker)? { let pending_requests_status = match self.poll_next_request(cx)? {
Poll::Ready(Some(dispatch_request)) => { Poll::Ready(Some(dispatch_request)) => {
self.write_request(dispatch_request)?; self.write_request(dispatch_request)?;
return Poll::Ready(Some(Ok(()))); return Poll::Ready(Some(Ok(())));
@@ -345,7 +351,7 @@ where
Poll::Pending => ReceiverStatus::NotReady, Poll::Pending => ReceiverStatus::NotReady,
}; };
let canceled_requests_status = match self.poll_next_cancellation(waker)? { let canceled_requests_status = match self.poll_next_cancellation(cx)? {
Poll::Ready(Some((context, request_id))) => { Poll::Ready(Some((context, request_id))) => {
self.write_cancel(context, request_id)?; self.write_cancel(context, request_id)?;
return Poll::Ready(Some(Ok(()))); return Poll::Ready(Some(Ok(())));
@@ -356,12 +362,12 @@ where
match (pending_requests_status, canceled_requests_status) { match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => { (ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.as_mut().transport().poll_flush(waker)?); ready!(self.as_mut().transport().poll_flush(cx)?);
Poll::Ready(None) Poll::Ready(None)
} }
(ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => { (ReceiverStatus::NotReady, _) | (_, ReceiverStatus::NotReady) => {
// No more messages to process, so flush any messages buffered in the transport. // No more messages to process, so flush any messages buffered in the transport.
ready!(self.as_mut().transport().poll_flush(waker)?); ready!(self.as_mut().transport().poll_flush(cx)?);
// Even if we fully-flush, we return Pending, because we have no more requests // Even if we fully-flush, we return Pending, because we have no more requests
// or cancellations right now. // or cancellations right now.
@@ -373,7 +379,7 @@ where
/// Yields the next pending request, if one is ready to be sent. /// Yields the next pending request, if one is ready to be sent.
fn poll_next_request( fn poll_next_request(
self: &mut Pin<&mut Self>, self: &mut Pin<&mut Self>,
waker: &Waker, cx: &mut Context<'_>,
) -> PollIo<DispatchRequest<Req, Resp>> { ) -> PollIo<DispatchRequest<Req, Resp>> {
if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests { if self.as_mut().in_flight_requests().len() >= self.config.max_in_flight_requests {
info!( info!(
@@ -387,13 +393,13 @@ where
return Poll::Pending; return Poll::Pending;
} }
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? {
// We can't yield a request-to-be-sent before the transport is capable of buffering it. // We can't yield a request-to-be-sent before the transport is capable of buffering it.
ready!(self.as_mut().transport().poll_flush(waker)?); ready!(self.as_mut().transport().poll_flush(cx)?);
} }
loop { loop {
match ready!(self.as_mut().pending_requests().poll_next_unpin(waker)) { match ready!(self.as_mut().pending_requests().poll_next_unpin(cx)) {
Some(request) => { Some(request) => {
if request.response_completion.is_canceled() { if request.response_completion.is_canceled() {
trace!( trace!(
@@ -416,14 +422,15 @@ where
/// Yields the next pending cancellation, and, if one is ready, cancels the associated request. /// Yields the next pending cancellation, and, if one is ready, cancels the associated request.
fn poll_next_cancellation( fn poll_next_cancellation(
self: &mut Pin<&mut Self>, self: &mut Pin<&mut Self>,
waker: &Waker, cx: &mut Context<'_>,
) -> PollIo<(context::Context, u64)> { ) -> PollIo<(context::Context, u64)> {
while let Poll::Pending = self.as_mut().transport().poll_ready(waker)? { while let Poll::Pending = self.as_mut().transport().poll_ready(cx)? {
ready!(self.as_mut().transport().poll_flush(waker)?); ready!(self.as_mut().transport().poll_flush(cx)?);
} }
loop { loop {
match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) { let cancellation = self.as_mut().canceled_requests().poll_next_unpin(cx);
match ready!(cancellation) {
Some(request_id) => { Some(request_id) => {
if let Some(in_flight_data) = if let Some(in_flight_data) =
self.as_mut().in_flight_requests().remove(&request_id) self.as_mut().in_flight_requests().remove(&request_id)
@@ -530,10 +537,10 @@ where
{ {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<io::Result<()>> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr()); trace!("[{}] RequestDispatch::poll", self.as_mut().server_addr());
loop { loop {
match (self.pump_read(waker)?, self.pump_write(waker)?) { match (self.pump_read(cx)?, self.pump_write(cx)?) {
(read, write @ Poll::Ready(None)) => { (read, write @ Poll::Ready(None)) => {
if self.as_mut().in_flight_requests().is_empty() { if self.as_mut().in_flight_requests().is_empty() {
info!( info!(
@@ -620,8 +627,8 @@ impl RequestCancellation {
impl Stream for CanceledRequests { impl Stream for CanceledRequests {
type Item = u64; type Item = u64;
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<u64>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u64>> {
self.0.poll_next_unpin(waker) self.0.poll_next_unpin(cx)
} }
} }
@@ -652,8 +659,8 @@ where
{ {
type Output = io::Result<Fut::Ok>; type Output = io::Result<Fut::Ok>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(waker) { match self.as_mut().future().try_poll(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(result) => { Poll::Ready(result) => {
self.finished().take().expect( self.finished().take().expect(
@@ -692,8 +699,8 @@ where
{ {
type Output = Result<DispatchResponse<Resp>, Fut::Error>; type Output = Result<DispatchResponse<Resp>, Fut::Error>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(waker) { match self.as_mut().future().try_poll(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(result) => { Poll::Ready(result) => {
let response = self let response = self
@@ -735,8 +742,8 @@ where
{ {
type Output = Result<Fut2::Ok, Fut2::Error>; type Output = Result<Fut2::Ok, Fut2::Error>;
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.try_chain().poll(waker, |result| match result { self.try_chain().poll(cx, |result| match result {
Ok(ok) => TryChainAction::Future(ok), Ok(ok) => TryChainAction::Future(ok),
Err(err) => TryChainAction::Output(Err(err)), Err(err) => TryChainAction::Output(Err(err)),
}) })
@@ -768,7 +775,11 @@ where
TryChain::First(fut1) TryChain::First(fut1)
} }
fn poll<F>(self: Pin<&mut Self>, waker: &Waker, f: F) -> Poll<Result<Fut2::Ok, Fut2::Error>> fn poll<F>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
f: F,
) -> Poll<Result<Fut2::Ok, Fut2::Error>>
where where
F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>, F: FnOnce(Result<Fut1::Ok, Fut1::Error>) -> TryChainAction<Fut2>,
{ {
@@ -781,14 +792,14 @@ where
let output = match this { let output = match this {
TryChain::First(fut1) => { TryChain::First(fut1) => {
// Poll the first future // Poll the first future
match unsafe { Pin::new_unchecked(fut1) }.try_poll(waker) { match unsafe { Pin::new_unchecked(fut1) }.try_poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(output) => output, Poll::Ready(output) => output,
} }
} }
TryChain::Second(fut2) => { TryChain::Second(fut2) => {
// Poll the second future // Poll the second future
return unsafe { Pin::new_unchecked(fut2) }.try_poll(waker); return unsafe { Pin::new_unchecked(fut2) }.try_poll(cx);
} }
TryChain::Empty => { TryChain::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`"); panic!("future must not be polled after it returned `Poll::Ready`");
@@ -807,7 +818,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{CanceledRequests, Channel, RequestCancellation, RequestDispatch}; use super::{
CanceledRequests, Channel, DispatchResponse, RequestCancellation, RequestDispatch,
};
use crate::{ use crate::{
client::Config, client::Config,
context, context,
@@ -815,7 +828,7 @@ mod tests {
ClientMessage, Response, ClientMessage, Response,
}; };
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::{channel::mpsc, prelude::*, Poll}; use futures::{channel::mpsc, prelude::*, task::Context, Poll};
use futures_test::task::noop_waker_ref; use futures_test::task::noop_waker_ref;
use std::{ use std::{
marker, marker,
@@ -828,20 +841,12 @@ mod tests {
#[test] #[test]
fn stage_request() { fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
// Test that a request future dropped before it's processed by dispatch will cause the request
// to not be added to the in-flight request map.
let _resp = tokio::runtime::current_thread::block_on_all(
channel
.send(context::current(), "hi".to_string())
.boxed()
.compat(),
);
let mut dispatch = Pin::new(&mut dispatch); let mut dispatch = Pin::new(&mut dispatch);
let waker = &noop_waker_ref(); let cx = &mut Context::from_waker(&noop_waker_ref());
let req = dispatch.poll_next_request(waker).ready(); let _resp = send_request(&mut channel, "hi");
let req = dispatch.poll_next_request(cx).ready();
assert!(req.is_some()); assert!(req.is_some());
let req = req.unwrap(); let req = req.unwrap();
@@ -849,49 +854,77 @@ mod tests {
assert_eq!(req.request, "hi".to_string()); assert_eq!(req.request, "hi".to_string());
} }
// Regression test for https://github.com/google/tarpc/issues/220
#[test] #[test]
fn stage_request_response_future_dropped() { fn stage_request_channel_dropped_doesnt_panic() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, mut server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
// Test that a request future dropped before it's processed by dispatch will cause the request let _ = send_request(&mut channel, "hi");
// to not be added to the in-flight request map.
let resp = tokio::runtime::current_thread::block_on_all(
channel
.send(context::current(), "hi".into())
.boxed()
.compat(),
)
.unwrap();
drop(resp);
drop(channel); drop(channel);
let mut dispatch = Pin::new(&mut dispatch); assert!(dispatch.as_mut().poll(cx).is_ready());
let waker = &noop_waker_ref(); send_response(
&mut server_channel,
dispatch.poll_next_cancellation(waker).unwrap(); Response {
assert!(dispatch.poll_next_request(waker).ready().is_none()); request_id: 0,
message: Ok("hello".into()),
},
);
tokio::runtime::current_thread::block_on_all(dispatch.boxed().compat()).unwrap();
} }
#[test] #[test]
fn stage_request_response_future_closed() { fn stage_request_response_future_dropped_is_canceled_before_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
let _ = send_request(&mut channel, "hi");
// Drop the channel so polling returns none if no requests are currently ready.
drop(channel);
// Test that a request future dropped before it's processed by dispatch will cause the request
// to not be added to the in-flight request map.
assert!(dispatch.poll_next_request(cx).ready().is_none());
}
#[test]
fn stage_request_response_future_dropped_is_canceled_after_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let mut dispatch = Pin::new(&mut dispatch);
let req = send_request(&mut channel, "hi");
assert!(dispatch.as_mut().pump_write(cx).ready().is_some());
assert!(!dispatch.as_mut().in_flight_requests().is_empty());
// Test that a request future dropped after it's processed by dispatch will cause the request
// to be removed from the in-flight request map.
drop(req);
if let Poll::Ready(Some(_)) = dispatch.as_mut().poll_next_cancellation(cx).unwrap() {
// ok
} else {
panic!("Expected request to be cancelled")
};
assert!(dispatch.in_flight_requests().is_empty());
}
#[test]
fn stage_request_response_closed_skipped() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
// Test that a request future that's closed its receiver but not yet canceled its request -- // Test that a request future that's closed its receiver but not yet canceled its request --
// i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request // i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request
// map. // map.
let resp = tokio::runtime::current_thread::block_on_all( let mut resp = send_request(&mut channel, "hi");
channel resp.response.get_mut().close();
.send(context::current(), "hi".into())
.boxed()
.compat(),
)
.unwrap();
drop(resp);
drop(channel);
let mut dispatch = Pin::new(&mut dispatch); assert!(dispatch.poll_next_request(cx).is_pending());
let waker = &noop_waker_ref();
assert!(dispatch.poll_next_request(waker).ready().is_none());
} }
fn set_up() -> ( fn set_up() -> (
@@ -908,7 +941,7 @@ mod tests {
let dispatch = RequestDispatch::<String, String, _> { let dispatch = RequestDispatch::<String, String, _> {
transport: client_channel.fuse(), transport: client_channel.fuse(),
pending_requests: pending_requests.fuse(), pending_requests: pending_requests.fuse(),
canceled_requests: CanceledRequests(canceled_requests), canceled_requests: CanceledRequests(canceled_requests).fuse(),
in_flight_requests: FnvHashMap::default(), in_flight_requests: FnvHashMap::default(),
config: Config::default(), config: Config::default(),
server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
@@ -925,6 +958,27 @@ mod tests {
(dispatch, channel, server_channel) (dispatch, channel, server_channel)
} }
fn send_request(
channel: &mut Channel<String, String>,
request: &str,
) -> DispatchResponse<String> {
tokio::runtime::current_thread::block_on_all(
channel
.send(context::current(), request.to_string())
.boxed()
.compat(),
)
.unwrap()
}
fn send_response(
channel: &mut UnboundedChannel<ClientMessage<String>, Response<String>>,
response: Response<String>,
) {
tokio::runtime::current_thread::block_on_all(channel.send(response).boxed().compat())
.unwrap();
}
trait PollTest { trait PollTest {
type T; type T;
fn unwrap(self) -> Poll<Self::T>; fn unwrap(self) -> Poll<Self::T>;
@@ -955,5 +1009,4 @@ mod tests {
} }
} }
} }
} }

View File

@@ -147,5 +147,5 @@ where
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0) SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
}); });
Ok(await!(channel::spawn(config, transport, server_addr))?) Ok(channel::spawn(config, transport, server_addr).await?)
} }

View File

@@ -8,9 +8,7 @@
non_exhaustive, non_exhaustive,
integer_atomics, integer_atomics,
try_trait, try_trait,
futures_api,
arbitrary_self_types, arbitrary_self_types,
await_macro,
async_await async_await
)] )]
#![deny(missing_docs, missing_debug_implementations)] #![deny(missing_docs, missing_debug_implementations)]
@@ -156,7 +154,7 @@ thread_local! {
// INIT must always be called before accessing SPAWN. // INIT must always be called before accessing SPAWN.
// Otherwise, accessing SPAWN can trigger undefined behavior due to race conditions. // Otherwise, accessing SPAWN can trigger undefined behavior due to race conditions.
INIT.call_once(|| {}); INIT.call_once(|| {});
RefCell::new(SEED_SPAWN.clone().expect("init() must be called.")) RefCell::new(SEED_SPAWN.as_ref().expect("init() must be called.").box_clone())
} }
}; };
} }
@@ -182,12 +180,6 @@ trait CloneSpawn: Spawn {
fn box_clone(&self) -> Box<dyn CloneSpawn>; fn box_clone(&self) -> Box<dyn CloneSpawn>;
} }
impl Clone for Box<dyn CloneSpawn> {
fn clone(&self) -> Self {
self.box_clone()
}
}
impl<S: Spawn + Clone + 'static> CloneSpawn for S { impl<S: Spawn + Clone + 'static> CloneSpawn for S {
fn box_clone(&self) -> Box<dyn CloneSpawn> { fn box_clone(&self) -> Box<dyn CloneSpawn> {
Box::new(self.clone()) Box::new(self.clone())

View File

@@ -15,7 +15,7 @@ use futures::{
prelude::*, prelude::*,
ready, ready,
stream::Fuse, stream::Fuse,
task::{Poll, Waker}, task::{Context, Poll},
}; };
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use pin_utils::unsafe_pinned; use pin_utils::unsafe_pinned;
@@ -197,7 +197,10 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
} }
} }
fn poll_listener<C>(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<NewConnection<Req, Resp, C>> fn poll_listener<C>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<NewConnection<Req, Resp, C>>
where where
S: Stream<Item = Result<C, io::Error>>, S: Stream<Item = Result<C, io::Error>>,
C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send, C: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
@@ -208,7 +211,10 @@ impl<S, Req, Resp> ConnectionFilter<S, Req, Resp> {
} }
} }
fn poll_closed_connections(self: &mut Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { fn poll_closed_connections(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) { match ready!(self.as_mut().closed_connections_rx().poll_next_unpin(cx)) {
Some(addr) => { Some(addr) => {
self.handle_closed_connection(&addr); self.handle_closed_connection(&addr);
@@ -226,7 +232,7 @@ where
{ {
type Item = io::Result<Channel<Req, Resp, T>>; type Item = io::Result<Channel<Req, Resp, T>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<Channel<Req, Resp, T>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Channel<Req, Resp, T>> {
loop { loop {
match ( match (
self.as_mut().poll_listener(cx)?, self.as_mut().poll_listener(cx)?,

View File

@@ -7,7 +7,7 @@
//! Provides a server that concurrently handles many connections sending multiplexed requests. //! Provides a server that concurrently handles many connections sending multiplexed requests.
use crate::{ use crate::{
context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage, context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
ClientMessageKind, PollIo, Request, Response, ServerError, Transport, ClientMessageKind, PollIo, Request, Response, ServerError, Transport,
}; };
use fnv::FnvHashMap; use fnv::FnvHashMap;
@@ -17,7 +17,7 @@ use futures::{
prelude::*, prelude::*,
ready, ready,
stream::Fuse, stream::Fuse,
task::{Poll, Waker}, task::{Context, Poll},
try_ready, try_ready,
}; };
use humantime::format_rfc3339; use humantime::format_rfc3339;
@@ -128,12 +128,12 @@ where
Req: Send + 'static, Req: Send + 'static,
Resp: Send + 'static, Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static, T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static, Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{ {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<()> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
match channel { match channel {
Ok(channel) => { Ok(channel) => {
@@ -165,7 +165,7 @@ where
/// Responds to all requests with `request_handler`. /// Responds to all requests with `request_handler`.
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F> fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
where where
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static, Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{ {
Running { Running {
@@ -234,15 +234,24 @@ where
self.as_mut().transport().start_send(response) self.as_mut().transport().start_send(response)
} }
pub(crate) fn poll_ready(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { pub(crate) fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_ready(cx) self.as_mut().transport().poll_ready(cx)
} }
pub(crate) fn poll_flush(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { pub(crate) fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_flush(cx) self.as_mut().transport().poll_flush(cx)
} }
pub(crate) fn poll_next(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<ClientMessage<Req>> { pub(crate) fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<ClientMessage<Req>> {
self.as_mut().transport().poll_next(cx) self.as_mut().transport().poll_next(cx)
} }
@@ -255,7 +264,7 @@ where
/// responses and resolves when the connection is closed. /// responses and resolves when the connection is closed.
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()> pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
where where
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static, Fut: Future<Output = io::Result<Resp>> + Send + 'static,
Req: 'static, Req: 'static,
Resp: 'static, Resp: 'static,
@@ -281,9 +290,9 @@ where
struct ClientHandler<Req, Resp, T, F> { struct ClientHandler<Req, Resp, T, F> {
channel: Channel<Req, Resp, T>, channel: Channel<Req, Resp, T>,
/// Responses waiting to be written to the wire. /// Responses waiting to be written to the wire.
pending_responses: Fuse<mpsc::Receiver<(Context, Response<Resp>)>>, pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>,
/// Handed out to request handlers to fan in responses. /// Handed out to request handlers to fan in responses.
responses_tx: mpsc::Sender<(Context, Response<Resp>)>, responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>,
/// Number of requests currently being responded to. /// Number of requests currently being responded to.
in_flight_requests: FnvHashMap<u64, AbortHandle>, in_flight_requests: FnvHashMap<u64, AbortHandle>,
/// Request handler. /// Request handler.
@@ -293,8 +302,8 @@ struct ClientHandler<Req, Resp, T, F> {
impl<Req, Resp, T, F> ClientHandler<Req, Resp, T, F> { impl<Req, Resp, T, F> ClientHandler<Req, Resp, T, F> {
unsafe_pinned!(channel: Channel<Req, Resp, T>); unsafe_pinned!(channel: Channel<Req, Resp, T>);
unsafe_pinned!(in_flight_requests: FnvHashMap<u64, AbortHandle>); unsafe_pinned!(in_flight_requests: FnvHashMap<u64, AbortHandle>);
unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(Context, Response<Resp>)>>); unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>);
unsafe_pinned!(responses_tx: mpsc::Sender<(Context, Response<Resp>)>); unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>);
// For this to be safe, field f must be private, and code in this module must never // For this to be safe, field f must be private, and code in this module must never
// construct PinMut<F>. // construct PinMut<F>.
unsafe_unpinned!(f: F); unsafe_unpinned!(f: F);
@@ -305,12 +314,15 @@ where
Req: Send + 'static, Req: Send + 'static,
Resp: Send + 'static, Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send, T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static, Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{ {
/// If at max in-flight requests, check that there's room to immediately write a throttled /// If at max in-flight requests, check that there's room to immediately write a throttled
/// response. /// response.
fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { fn poll_ready_if_throttling(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
if self.in_flight_requests.len() if self.in_flight_requests.len()
>= self.channel.config.max_in_flight_requests_per_connection >= self.channel.config.max_in_flight_requests_per_connection
{ {
@@ -328,7 +340,7 @@ where
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn pump_read(mut self: Pin<&mut Self>, cx: &Waker) -> PollIo<()> { fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
ready!(self.as_mut().poll_ready_if_throttling(cx)?); ready!(self.as_mut().poll_ready_if_throttling(cx)?);
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) { Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
@@ -350,7 +362,11 @@ where
}) })
} }
fn pump_write(mut self: Pin<&mut Self>, cx: &Waker, read_half_closed: bool) -> PollIo<()> { fn pump_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
read_half_closed: bool,
) -> PollIo<()> {
match self.as_mut().poll_next_response(cx)? { match self.as_mut().poll_next_response(cx)? {
Poll::Ready(Some((_, response))) => { Poll::Ready(Some((_, response))) => {
self.as_mut().channel().start_send(response)?; self.as_mut().channel().start_send(response)?;
@@ -379,8 +395,8 @@ where
fn poll_next_response( fn poll_next_response(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &Waker, cx: &mut Context<'_>,
) -> PollIo<(Context, Response<Resp>)> { ) -> PollIo<(context::Context, Response<Resp>)> {
// Ensure there's room to write a response. // Ensure there's room to write a response.
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? { while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
ready!(self.as_mut().channel().poll_flush(cx)?); ready!(self.as_mut().channel().poll_flush(cx)?);
@@ -421,7 +437,7 @@ where
) -> io::Result<()> { ) -> io::Result<()> {
let request_id = request.id; let request_id = request.id;
let peer = self.as_mut().channel().client_addr; let peer = self.as_mut().channel().client_addr;
let ctx = Context { let ctx = context::Context {
deadline: request.deadline, deadline: request.deadline,
trace_context, trace_context,
}; };
@@ -478,7 +494,10 @@ where
}, },
}; };
trace!("[{}/{}] Sending response.", trace_id, peer); trace!("[{}/{}] Sending response.", trace_id, peer);
await!(response_tx.send((ctx, response)).unwrap_or_else(|_| ())); response_tx
.send((ctx, response))
.unwrap_or_else(|_| ())
.await;
}, },
); );
let (abortable_response, abort_handle) = abortable(response); let (abortable_response, abort_handle) = abortable(response);
@@ -527,12 +546,12 @@ where
Req: Send + 'static, Req: Send + 'static,
Resp: Send + 'static, Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send, T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnOnce(Context, Req) -> Fut + Send + 'static + Clone, F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static, Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{ {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
trace!("[{}] ClientHandler::poll", self.channel.client_addr); trace!("[{}] ClientHandler::poll", self.channel.client_addr);
loop { loop {
let read = self.as_mut().pump_read(cx)?; let read = self.as_mut().pump_read(cx)?;

View File

@@ -7,7 +7,7 @@
//! Transports backed by in-memory channels. //! Transports backed by in-memory channels.
use crate::{PollIo, Transport}; use crate::{PollIo, Transport};
use futures::{channel::mpsc, task::Waker, Poll, Sink, Stream}; use futures::{channel::mpsc, task::Context, Poll, Sink, Stream};
use pin_utils::unsafe_pinned; use pin_utils::unsafe_pinned;
use std::pin::Pin; use std::pin::Pin;
use std::{ use std::{
@@ -45,16 +45,15 @@ impl<Item, SinkItem> UnboundedChannel<Item, SinkItem> {
impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> { impl<Item, SinkItem> Stream for UnboundedChannel<Item, SinkItem> {
type Item = Result<Item, io::Error>; type Item = Result<Item, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &Waker) -> PollIo<Item> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<Item> {
self.rx().poll_next(cx).map(|option| option.map(Ok)) self.rx().poll_next(cx).map(|option| option.map(Ok))
} }
} }
impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> { impl<Item, SinkItem> Sink<SinkItem> for UnboundedChannel<Item, SinkItem> {
type SinkItem = SinkItem;
type SinkError = io::Error; type SinkError = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.tx() self.tx()
.poll_ready(cx) .poll_ready(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
@@ -66,13 +65,13 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
} }
fn poll_flush(self: Pin<&mut Self>, cx: &Waker) -> Poll<Result<(), Self::SinkError>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
self.tx() self.tx()
.poll_flush(cx) .poll_flush(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
} }
fn poll_close(self: Pin<&mut Self>, cx: &Waker) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.tx() self.tx()
.poll_close(cx) .poll_close(cx)
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected)) .map_err(|_| io::Error::from(io::ErrorKind::NotConnected))
@@ -80,8 +79,8 @@ impl<Item, SinkItem> Sink for UnboundedChannel<Item, SinkItem> {
} }
impl<Item, SinkItem> Transport for UnboundedChannel<Item, SinkItem> { impl<Item, SinkItem> Transport for UnboundedChannel<Item, SinkItem> {
type Item = Item;
type SinkItem = SinkItem; type SinkItem = SinkItem;
type Item = Item;
fn peer_addr(&self) -> io::Result<SocketAddr> { fn peer_addr(&self) -> io::Result<SocketAddr> {
Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)) Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
@@ -122,16 +121,19 @@ mod tests {
}); });
let responses = async { let responses = async {
let mut client = await!(client::new(client::Config::default(), client_channel))?; let mut client = client::new(client::Config::default(), client_channel).await?;
let response1 = await!(client.call(context::current(), "123".into())); let response1 = client.call(context::current(), "123".into()).await;
let response2 = await!(client.call(context::current(), "abc".into())); let response2 = client.call(context::current(), "abc".into()).await;
Ok::<_, io::Error>((response1, response2)) Ok::<_, io::Error>((response1, response2))
}; };
let (response1, response2) = let (response1, response2) = run_future(future::join(
run_future(server.join(responses.unwrap_or_else(|e| panic!(e)))).1; server,
responses.unwrap_or_else(|e| panic!(e)),
))
.1;
trace!("response1: {:?}, response2: {:?}", response1, response2); trace!("response1: {:?}, response2: {:?}", response1, response2);

View File

@@ -12,9 +12,10 @@
use futures::prelude::*; use futures::prelude::*;
use std::{ use std::{
io, io,
marker::PhantomData,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
task::{Poll, Waker}, task::{Context, Poll},
}; };
pub mod channel; pub mod channel;
@@ -23,7 +24,7 @@ pub mod channel;
pub trait Transport pub trait Transport
where where
Self: Stream<Item = io::Result<<Self as Transport>::Item>>, Self: Stream<Item = io::Result<<Self as Transport>::Item>>,
Self: Sink<SinkItem = <Self as Transport>::SinkItem, SinkError = io::Error>, Self: Sink<<Self as Transport>::SinkItem, SinkError = io::Error>,
{ {
/// The type read off the transport. /// The type read off the transport.
type Item; type Item;
@@ -37,77 +38,78 @@ where
} }
/// Returns a new Transport backed by the given Stream + Sink and connecting addresses. /// Returns a new Transport backed by the given Stream + Sink and connecting addresses.
pub fn new<S, Item>( pub fn new<S, SinkItem, Item>(
inner: S, inner: S,
peer_addr: SocketAddr, peer_addr: SocketAddr,
local_addr: SocketAddr, local_addr: SocketAddr,
) -> impl Transport<Item = Item, SinkItem = S::SinkItem> ) -> impl Transport<Item = Item, SinkItem = SinkItem>
where where
S: Stream<Item = io::Result<Item>>, S: Stream<Item = io::Result<Item>>,
S: Sink<SinkError = io::Error>, S: Sink<SinkItem, SinkError = io::Error>,
{ {
TransportShim { TransportShim {
inner, inner,
peer_addr, peer_addr,
local_addr, local_addr,
_marker: PhantomData,
} }
} }
/// A transport created by adding peers to a Stream + Sink. /// A transport created by adding peers to a Stream + Sink.
#[derive(Debug)] #[derive(Debug)]
struct TransportShim<S> { struct TransportShim<S, SinkItem> {
peer_addr: SocketAddr, peer_addr: SocketAddr,
local_addr: SocketAddr, local_addr: SocketAddr,
inner: S, inner: S,
_marker: PhantomData<SinkItem>,
} }
impl<S> TransportShim<S> { impl<S, SinkItem> TransportShim<S, SinkItem> {
pin_utils::unsafe_pinned!(inner: S); pin_utils::unsafe_pinned!(inner: S);
} }
impl<S> Stream for TransportShim<S> impl<S, SinkItem> Stream for TransportShim<S, SinkItem>
where where
S: Stream, S: Stream,
{ {
type Item = S::Item; type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
self.inner().poll_next(waker) self.inner().poll_next(cx)
} }
} }
impl<S> Sink for TransportShim<S> impl<S, Item> Sink<Item> for TransportShim<S, Item>
where where
S: Sink, S: Sink<Item>,
{ {
type SinkItem = S::SinkItem;
type SinkError = S::SinkError; type SinkError = S::SinkError;
fn start_send(self: Pin<&mut Self>, item: S::SinkItem) -> Result<(), S::SinkError> { fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), S::SinkError> {
self.inner().start_send(item) self.inner().start_send(item)
} }
fn poll_ready(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_ready(waker) self.inner().poll_ready(cx)
} }
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_flush(waker) self.inner().poll_flush(cx)
} }
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), S::SinkError>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::SinkError>> {
self.inner().poll_close(waker) self.inner().poll_close(cx)
} }
} }
impl<S, Item> Transport for TransportShim<S> impl<S, SinkItem, Item> Transport for TransportShim<S, SinkItem>
where where
S: Stream + Sink, S: Stream + Sink<SinkItem>,
Self: Stream<Item = io::Result<Item>>, Self: Stream<Item = io::Result<Item>>,
Self: Sink<SinkItem = S::SinkItem, SinkError = io::Error>, Self: Sink<SinkItem, SinkError = io::Error>,
{ {
type Item = Item; type Item = Item;
type SinkItem = S::SinkItem; type SinkItem = SinkItem;
/// The address of the remote peer this transport is in communication with. /// The address of the remote peer this transport is in communication with.
fn peer_addr(&self) -> io::Result<SocketAddr> { fn peer_addr(&self) -> io::Result<SocketAddr> {

View File

@@ -5,10 +5,10 @@
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
use futures::{ use futures::{
compat::{Compat01As03, Future01CompatExt}, compat::*,
prelude::*, prelude::*,
ready, ready,
task::{Poll, Waker}, task::{Context, Poll},
}; };
use pin_utils::unsafe_pinned; use pin_utils::unsafe_pinned;
use std::pin::Pin; use std::pin::Pin;
@@ -50,15 +50,15 @@ where
{ {
type Output = Result<T::Ok, timeout::Error<T::Error>>; type Output = Result<T::Ok, timeout::Error<T::Error>>;
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// First, try polling the future // First, try polling the future
match self.as_mut().future().try_poll(waker) { match self.as_mut().future().try_poll(cx) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))), Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),
} }
let delay = self.delay().poll_unpin(waker); let delay = self.delay().poll_unpin(cx);
// Now check the timer // Now check the timer
match ready!(delay) { match ready!(delay) {

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.15.0" version = "0.18.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT" license = "MIT"
@@ -19,20 +19,17 @@ serde1 = ["rpc/serde1", "serde", "serde/derive"]
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
log = "0.4" log = "0.4"
serde = { optional = true, version = "1.0" } serde = { optional = true, version = "1.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" } tarpc-plugins = { path = "../plugins", version = "0.5.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.3" }
[target.'cfg(not(test))'.dependencies]
futures-preview = "0.3.0-alpha.13"
[dev-dependencies] [dev-dependencies]
bincode = "1.0" bincode = "1"
bytes = { version = "0.4", features = ["serde"] } bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0" humantime = "1.0"
futures-preview = { version = "0.3.0-alpha.13", features = ["compat"] } bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.4", path = "../bincode-transport" }
env_logger = "0.6" env_logger = "0.6"
libtest = "0.0.1" libtest = "0.0.1"
tokio = "0.1" tokio = "0.1"

View File

@@ -6,8 +6,6 @@
#![feature( #![feature(
arbitrary_self_types, arbitrary_self_types,
futures_api,
await_macro,
async_await, async_await,
existential_type, existential_type,
proc_macro_hygiene proc_macro_hygiene
@@ -100,7 +98,7 @@ impl publisher::Service for Publisher {
// Ignore failing subscribers. In a real pubsub, // Ignore failing subscribers. In a real pubsub,
// you'd want to continually retry until subscribers // you'd want to continually retry until subscribers
// ack. // ack.
let _ = await!(client.receive(context::current(), message.clone())); let _ = client.receive(context::current(), message.clone()).await;
} }
} }
@@ -115,8 +113,8 @@ impl publisher::Service for Publisher {
id: u32, id: u32,
addr: SocketAddr, addr: SocketAddr,
) -> io::Result<()> { ) -> io::Result<()> {
let conn = await!(bincode_transport::connect(&addr))?; let conn = bincode_transport::connect(&addr).await?;
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn))?; let subscriber = subscriber::new_stub(client::Config::default(), conn).await?;
println!("Subscribing {}.", id); println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber); clients.lock().unwrap().insert(id, subscriber);
Ok(()) Ok(())
@@ -154,27 +152,34 @@ async fn run() -> io::Result<()> {
.compat(), .compat(),
); );
let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?; let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
let subscriber2 = await!(Subscriber::listen(1, server::Config::default()))?; let subscriber2 = Subscriber::listen(1, server::Config::default()).await?;
let publisher_conn = bincode_transport::connect(&publisher_addr); let publisher_conn = bincode_transport::connect(&publisher_addr);
let publisher_conn = await!(publisher_conn)?; let publisher_conn = publisher_conn.await?;
let mut publisher = await!(publisher::new_stub( let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?;
client::Config::default(),
publisher_conn
))?;
if let Err(e) = await!(publisher.subscribe(context::current(), 0, subscriber1))? { if let Err(e) = publisher
.subscribe(context::current(), 0, subscriber1)
.await?
{
eprintln!("Couldn't subscribe subscriber 0: {}", e); eprintln!("Couldn't subscribe subscriber 0: {}", e);
} }
if let Err(e) = await!(publisher.subscribe(context::current(), 1, subscriber2))? { if let Err(e) = publisher
.subscribe(context::current(), 1, subscriber2)
.await?
{
eprintln!("Couldn't subscribe subscriber 1: {}", e); eprintln!("Couldn't subscribe subscriber 1: {}", e);
} }
println!("Broadcasting..."); println!("Broadcasting...");
await!(publisher.broadcast(context::current(), "hello to all".to_string()))?; publisher
await!(publisher.unsubscribe(context::current(), 1))?; .broadcast(context::current(), "hello to all".to_string())
await!(publisher.broadcast(context::current(), "hi again".to_string()))?; .await?;
publisher.unsubscribe(context::current(), 1).await?;
publisher
.broadcast(context::current(), "hi again".to_string())
.await?;
Ok(()) Ok(())
} }

View File

@@ -4,13 +4,7 @@
// license that can be found in the LICENSE file or at // license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT. // https://opensource.org/licenses/MIT.
#![feature( #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
futures_api,
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene
)]
use futures::{ use futures::{
compat::Executor01CompatExt, compat::Executor01CompatExt,
@@ -64,17 +58,17 @@ async fn run() -> io::Result<()> {
tokio_executor::spawn(server.unit_error().boxed().compat()); tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&addr))?; let transport = bincode_transport::connect(&addr).await?;
// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any // new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro. // Transport as input, and returns a Client, also generated by the macro.
// by the service mcro. // by the service mcro.
let mut client = await!(new_stub(client::Config::default(), transport))?; let mut client = new_stub(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args // The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context // as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests. // specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), "Stim".to_string()))?; let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello); println!("{}", hello);

View File

@@ -7,8 +7,6 @@
#![feature( #![feature(
existential_type, existential_type,
arbitrary_self_types, arbitrary_self_types,
futures_api,
await_macro,
async_await, async_await,
proc_macro_hygiene proc_macro_hygiene
)] )]
@@ -60,8 +58,10 @@ impl DoubleService for DoubleServer {
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut { fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> { async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
let result = await!(client.add(context::current(), x, x)); client
result.map_err(|e| e.to_string()) .add(context::current(), x, x)
.await
.map_err(|e| e.to_string())
} }
double(self.add_client.clone(), x) double(self.add_client.clone(), x)
@@ -77,8 +77,8 @@ async fn run() -> io::Result<()> {
.respond_with(add::serve(AddServer)); .respond_with(add::serve(AddServer));
tokio_executor::spawn(add_server.unit_error().boxed().compat()); tokio_executor::spawn(add_server.unit_error().boxed().compat());
let to_add_server = await!(bincode_transport::connect(&addr))?; let to_add_server = bincode_transport::connect(&addr).await?;
let add_client = await!(add::new_stub(client::Config::default(), to_add_server))?; let add_client = add::new_stub(client::Config::default(), to_add_server).await?;
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = double_listener.local_addr(); let addr = double_listener.local_addr();
@@ -88,14 +88,11 @@ async fn run() -> io::Result<()> {
.respond_with(double::serve(DoubleServer { add_client })); .respond_with(double::serve(DoubleServer { add_client }));
tokio_executor::spawn(double_server.unit_error().boxed().compat()); tokio_executor::spawn(double_server.unit_error().boxed().compat());
let to_double_server = await!(bincode_transport::connect(&addr))?; let to_double_server = bincode_transport::connect(&addr).await?;
let mut double_client = await!(double::new_stub( let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
client::Config::default(),
to_double_server
))?;
for i in 1..=5 { for i in 1..=5 {
println!("{:?}", await!(double_client.double(context::current(), i))?); println!("{:?}", double_client.double(context::current(), i).await?);
} }
Ok(()) Ok(())
} }

View File

@@ -1,7 +1,5 @@
#![feature( #![feature(
async_await, async_await,
await_macro,
futures_api,
arbitrary_self_types, arbitrary_self_types,
proc_macro_hygiene, proc_macro_hygiene,
impl_trait_in_bindings impl_trait_in_bindings
@@ -18,7 +16,7 @@ mod registry {
io, io,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Poll, Waker}, task::{Context, Poll},
}; };
use tarpc::{ use tarpc::{
client::{self, Client}, client::{self, Client},
@@ -88,7 +86,7 @@ mod registry {
serve: move |cx, req: Bytes| { serve: move |cx, req: Bytes| {
async move { async move {
let req = deserialize.clone()(req)?; let req = deserialize.clone()(req)?;
let response = await!(serve.clone()(cx, req))?; let response = serve.clone()(cx, req).await?;
let response = serialize.clone()(response)?; let response = serialize.clone()(response)?;
Ok(ServiceResponse { response }) Ok(ServiceResponse { response })
} }
@@ -213,11 +211,11 @@ mod registry {
{ {
type Output = Output; type Output = Output;
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output> {
unsafe { unsafe {
match Pin::get_unchecked_mut(self) { match Pin::get_unchecked_mut(self) {
Either::Left(car) => Pin::new_unchecked(car).poll(waker), Either::Left(car) => Pin::new_unchecked(car).poll(cx),
Either::Right(cdr) => Pin::new_unchecked(cdr).poll(waker), Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx),
} }
} }
} }
@@ -392,8 +390,8 @@ async fn run() -> io::Result<()> {
.respond_with(registry.serve()); .respond_with(registry.serve());
tokio_executor::spawn(server.unit_error().boxed().compat()); tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&server_addr))?; let transport = bincode_transport::connect(&server_addr).await?;
let channel = await!(client::new(client::Config::default(), transport))?; let channel = client::new(client::Config::default(), transport).await?;
let write_client = new_client("WriteService".to_string(), &channel); let write_client = new_client("WriteService".to_string(), &channel);
let mut write_client = write_service::Client::from(write_client); let mut write_client = write_service::Client::from(write_client);
@@ -401,8 +399,12 @@ async fn run() -> io::Result<()> {
let read_client = new_client("ReadService".to_string(), &channel); let read_client = new_client("ReadService".to_string(), &channel);
let mut read_client = read_service::Client::from(read_client); let mut read_client = read_service::Client::from(read_client);
await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?; write_client
let val = await!(read_client.read(context::current(), "key".to_string()))?; .write(context::current(), "key".to_string(), "val".to_string())
.await?;
let val = read_client
.read(context::current(), "key".to_string())
.await?;
println!("{:?}", val); println!("{:?}", val);
Ok(()) Ok(())

View File

@@ -7,10 +7,7 @@
#![doc(include = "../README.md")] #![doc(include = "../README.md")]
#![deny(missing_docs, missing_debug_implementations)] #![deny(missing_docs, missing_debug_implementations)]
#![feature(async_await, external_doc)] #![feature(async_await, external_doc)]
#![cfg_attr( #![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))]
test,
feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types)
)]
#[doc(hidden)] #[doc(hidden)]
pub use futures; pub use futures;

View File

@@ -30,7 +30,7 @@ macro_rules! add_serde_if_enabled {
/// Rpc methods are specified, mirroring trait syntax: /// Rpc methods are specified, mirroring trait syntax:
/// ///
/// ``` /// ```
/// # #![feature(await_macro, pin, arbitrary_self_types, async_await, futures_api, proc_macro_hygiene)] /// # #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
/// # fn main() {} /// # fn main() {}
/// # tarpc::service! { /// # tarpc::service! {
/// /// Say hello /// /// Say hello
@@ -177,7 +177,7 @@ macro_rules! service {
impl<S: Service> ::std::future::Future for ResponseFut<S> { impl<S: Service> ::std::future::Future for ResponseFut<S> {
type Output = ::std::io::Result<Response>; type Output = ::std::io::Result<Response>;
fn poll(self: ::std::pin::Pin<&mut Self>, waker: &::std::task::Waker) fn poll(self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>)
-> ::std::task::Poll<::std::io::Result<Response>> -> ::std::task::Poll<::std::io::Result<Response>>
{ {
unsafe { unsafe {
@@ -185,7 +185,7 @@ macro_rules! service {
$( $(
ResponseFut::$fn_name(resp) => ResponseFut::$fn_name(resp) =>
::std::pin::Pin::new_unchecked(resp) ::std::pin::Pin::new_unchecked(resp)
.poll(waker) .poll(cx)
.map(Response::$fn_name) .map(Response::$fn_name)
.map(Ok), .map(Ok),
)* )*
@@ -222,7 +222,7 @@ macro_rules! service {
Item = $crate::Response<Response>, Item = $crate::Response<Response>,
SinkItem = $crate::ClientMessage<Request>> + Send + 'static, SinkItem = $crate::ClientMessage<Request>> + Send + 'static,
{ {
Ok(Client(await!($crate::client::new(config, transport))?)) Ok(Client($crate::client::new(config, transport).await?))
} }
impl<C> From<C> for Client<C> impl<C> From<C> for Client<C>
@@ -244,7 +244,7 @@ macro_rules! service {
let request__ = Request::$fn_name { $($arg,)* }; let request__ = Request::$fn_name { $($arg,)* };
let resp = $crate::Client::call(&mut self.0, ctx, request__); let resp = $crate::Client::call(&mut self.0, ctx, request__);
async move { async move {
match await!(resp)? { match resp.await? {
Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__), Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
_ => unreachable!(), _ => unreachable!(),
} }
@@ -328,11 +328,11 @@ mod functional_test {
.compat(), .compat(),
); );
let mut client = await!(new_stub(client::Config::default(), tx))?; let mut client = new_stub(client::Config::default(), tx).await?;
assert_eq!(3, await!(client.add(context::current(), 1, 2))?); assert_eq!(3, client.add(context::current(), 1, 2).await?);
assert_eq!( assert_eq!(
"Hey, Tim.", "Hey, Tim.",
await!(client.hey(context::current(), "Tim".to_string()))? client.hey(context::current(), "Tim".to_string()).await?
); );
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
} }
@@ -357,7 +357,7 @@ mod functional_test {
.compat(), .compat(),
); );
let client = await!(new_stub(client::Config::default(), tx))?; let client = new_stub(client::Config::default(), tx).await?;
let mut c = client.clone(); let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2); let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone(); let mut c = client.clone();
@@ -365,9 +365,9 @@ mod functional_test {
let mut c = client.clone(); let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string()); let req3 = c.hey(context::current(), "Tim".to_string());
assert_eq!(3, await!(req1)?); assert_eq!(3, req1.await?);
assert_eq!(7, await!(req2)?); assert_eq!(7, req2.await?);
assert_eq!("Hey, Tim.", await!(req3)?); assert_eq!("Hey, Tim.", req3.await?);
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
} }
.map_err(|e| panic!("test failed: {}", e)); .map_err(|e| panic!("test failed: {}", e));

View File

@@ -8,9 +8,6 @@
test, test,
arbitrary_self_types, arbitrary_self_types,
integer_atomics, integer_atomics,
futures_api,
generators,
await_macro,
async_await, async_await,
proc_macro_hygiene proc_macro_hygiene
)] )]
@@ -57,8 +54,8 @@ async fn bench() -> io::Result<()> {
.compat(), .compat(),
); );
let conn = await!(bincode_transport::connect(&addr))?; let conn = bincode_transport::connect(&addr).await?;
let mut client = await!(ack::new_stub(client::Config::default(), conn))?; let mut client = ack::new_stub(client::Config::default(), conn).await?;
let total = 10_000usize; let total = 10_000usize;
let mut successful = 0u32; let mut successful = 0u32;
@@ -66,7 +63,7 @@ async fn bench() -> io::Result<()> {
let mut durations = vec![]; let mut durations = vec![];
for _ in 1..=total { for _ in 1..=total {
let now = Instant::now(); let now = Instant::now();
let response = await!(client.ack(context::current())); let response = client.ack(context::current()).await;
let elapsed = now.elapsed(); let elapsed = now.elapsed();
match response { match response {