5 Commits

Author SHA1 Message Date
Tim Kuehn
6745cee72c Bump tarpc to v0.18.0 2019-05-11 13:00:35 -07:00
Artem Vorotnikov
31abea18b3 Update to futures-preview 0.3.0-alpha.16 (#230) 2019-05-11 15:18:52 -04:00
Tim Kuehn
593ac135ce Remove stable features from doc examples 2019-04-30 13:18:39 -07:00
Tim Kuehn
05a924d27f Bump tarpc version to 0.17.0 2019-04-30 13:01:45 -07:00
Artem Vorotnikov
af9d71ed0d Bump futures to 0.3.0-alpha.15 (#226) 2019-04-28 20:13:06 -07:00
23 changed files with 111 additions and 134 deletions

View File

@@ -33,7 +33,7 @@ arguments to tarpc fns.
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.15.0"
tarpc = "0.18.0"
```
The `service!` macro expands to a collection of items that form an
@@ -48,7 +48,7 @@ races!
Here's a small service.
```rust
#![feature(futures_api, pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
#![feature(arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
use futures::{

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-bincode-transport"
version = "0.5.0"
version = "0.7.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -14,10 +14,10 @@ description = "A bincode-based transport for tarpc services."
[dependencies]
bincode = "1"
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.4"
rpc = { package = "tarpc-lib", version = "0.4", path = "../rpc", features = ["serde1"] }
rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] }
serde = "1.0"
tokio-io = "0.1"
async-bincode = "0.4"

View File

@@ -6,7 +6,7 @@
//! A TCP [`Transport`] that serializes as bincode.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
#![feature(arbitrary_self_types, async_await)]
#![deny(missing_docs, missing_debug_implementations)]
use async_bincode::{AsyncBincodeStream, AsyncDestination};
@@ -131,7 +131,7 @@ where
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
{
Ok(new(await!(TcpStream::connect(addr).compat())?))
Ok(new(TcpStream::connect(addr).compat().await?))
}
/// Listens on `addr`, wrapping accepted connections in bincode transports.

View File

@@ -6,14 +6,7 @@
//! Tests client/server control flow.
#![feature(
test,
integer_atomics,
futures_api,
generators,
await_macro,
async_await
)]
#![feature(test, integer_atomics, async_await)]
use futures::{compat::Executor01CompatExt, prelude::*};
use libtest::stats::Stats;
@@ -40,8 +33,8 @@ async fn bench() -> io::Result<()> {
.compat(),
);
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = &mut await!(client::new::<u32, u32, _>(client::Config::default(), conn))?;
let conn = tarpc_bincode_transport::connect(&addr).await?;
let client = &mut client::new::<u32, u32, _>(client::Config::default(), conn).await?;
let total = 10_000usize;
let mut successful = 0u32;
@@ -49,7 +42,7 @@ async fn bench() -> io::Result<()> {
let mut durations = vec![];
for _ in 1..=total {
let now = Instant::now();
let response = await!(client.call(context::current(), 0u32));
let response = client.call(context::current(), 0u32).await;
let elapsed = now.elapsed();
match response {

View File

@@ -6,7 +6,7 @@
//! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api)]
#![feature(async_await)]
use futures::{
compat::{Executor01CompatExt, Future01CompatExt},
@@ -66,7 +66,7 @@ async fn run() -> io::Result<()> {
let wait = Delay::new(Instant::now() + delay).compat();
async move {
await!(wait).unwrap();
wait.await.unwrap();
Ok(request)
}
});
@@ -75,11 +75,8 @@ async fn run() -> io::Result<()> {
tokio_executor::spawn(server.unit_error().boxed().compat());
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = await!(client::new::<String, String, _>(
client::Config::default(),
conn
))?;
let conn = tarpc_bincode_transport::connect(&addr).await?;
let client = client::new::<String, String, _>(client::Config::default(), conn).await?;
// Proxy service
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
@@ -99,7 +96,7 @@ async fn run() -> io::Result<()> {
let handler = channel.respond_with(move |ctx, request| {
trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr);
let mut client = client.clone();
async move { await!(client.call(ctx, request)) }
async move { client.call(ctx, request).await }
});
tokio_executor::spawn(handler.unit_error().boxed().compat());
}
@@ -111,10 +108,9 @@ async fn run() -> io::Result<()> {
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;
let client = await!(client::new::<String, String, _>(
config,
await!(tarpc_bincode_transport::connect(&addr))?
))?;
let client =
client::new::<String, String, _>(config, tarpc_bincode_transport::connect(&addr).await?)
.await?;
// Make 3 speculative requests, returning only the quickest.
let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect();
@@ -126,10 +122,11 @@ async fn run() -> io::Result<()> {
let response = client.call(ctx, "ping".into());
requests.push(response.map(move |r| (trace_id, r)));
}
let (fastest_response, _) = await!(requests
let (fastest_response, _) = requests
.into_iter()
.collect::<FuturesUnordered<_>>()
.into_future());
.into_future()
.await;
let (trace_id, resp) = fastest_response.unwrap();
info!("[{}] fastest_response = {:?}", trace_id, resp);

View File

@@ -6,7 +6,7 @@
//! Tests client/server control flow.
#![feature(generators, await_macro, async_await, futures_api)]
#![feature(async_await)]
use futures::{
compat::{Executor01CompatExt, Future01CompatExt},
@@ -65,7 +65,7 @@ async fn run() -> io::Result<()> {
let sleep = Delay::new(Instant::now() + delay).compat();
async {
await!(sleep).unwrap();
sleep.await.unwrap();
Ok(request)
}
});
@@ -78,8 +78,8 @@ async fn run() -> io::Result<()> {
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
let client = await!(client::new::<String, String, _>(config, conn))?;
let conn = tarpc_bincode_transport::connect(&addr).await?;
let client = client::new::<String, String, _>(config, conn).await?;
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
@@ -88,7 +88,7 @@ async fn run() -> io::Result<()> {
async move {
let trace_id = *ctx.trace_id();
let response = client.call(ctx, "ping".into());
match await!(response) {
match response.await {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-example-service"
version = "0.4.0"
version = "0.6.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
license = "MIT"
@@ -13,11 +13,11 @@ readme = "../README.md"
description = "An example server built on tarpc."
[dependencies]
bincode-transport = { package = "tarpc-bincode-transport", version = "0.5", path = "../bincode-transport" }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
clap = "2.0"
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
serde = { version = "1.0" }
tarpc = { version = "0.16", path = "../tarpc", features = ["serde1"] }
tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] }
tokio = "0.1"
tokio-executor = "0.1"

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
#![feature(arbitrary_self_types, async_await)]
use clap::{App, Arg};
use futures::{compat::Executor01CompatExt, prelude::*};
@@ -12,17 +12,17 @@ use std::{io, net::SocketAddr};
use tarpc::{client, context};
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
let transport = await!(bincode_transport::connect(&server_addr))?;
let transport = bincode_transport::connect(&server_addr).await?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
let mut client = service::new_stub(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), name))?;
let hello = client.hello(context::current(), name).await?;
println!("{}", hello);

View File

@@ -4,13 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(
futures_api,
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene
)]
#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(futures_api, arbitrary_self_types, await_macro, async_await)]
#![feature(arbitrary_self_types, async_await)]
use clap::{App, Arg};
use futures::{
@@ -47,7 +47,7 @@ async fn run(server_addr: SocketAddr) -> io::Result<()> {
// the generated Service trait.
.respond_with(service::serve(HelloServer));
await!(server);
server.await;
Ok(())
}

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc-lib"
version = "0.4.0"
version = "0.6.0"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = '2018'
license = "MIT"
@@ -18,7 +18,7 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
[dependencies]
fnv = "1.0"
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
humantime = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha.4"
@@ -28,6 +28,6 @@ trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
serde = { optional = true, version = "1.0" }
[dev-dependencies]
futures-test-preview = { version = "0.3.0-alpha.14" }
futures-test-preview = { version = "0.3.0-alpha.16" }
env_logger = "0.6"
tokio = "0.1"

View File

@@ -147,5 +147,5 @@ where
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
});
Ok(await!(channel::spawn(config, transport, server_addr))?)
Ok(channel::spawn(config, transport, server_addr).await?)
}

View File

@@ -8,9 +8,7 @@
non_exhaustive,
integer_atomics,
try_trait,
futures_api,
arbitrary_self_types,
await_macro,
async_await
)]
#![deny(missing_docs, missing_debug_implementations)]

View File

@@ -494,7 +494,10 @@ where
},
};
trace!("[{}/{}] Sending response.", trace_id, peer);
await!(response_tx.send((ctx, response)).unwrap_or_else(|_| ()));
response_tx
.send((ctx, response))
.unwrap_or_else(|_| ())
.await;
},
);
let (abortable_response, abort_handle) = abortable(response);

View File

@@ -121,10 +121,10 @@ mod tests {
});
let responses = async {
let mut client = await!(client::new(client::Config::default(), client_channel))?;
let mut client = client::new(client::Config::default(), client_channel).await?;
let response1 = await!(client.call(context::current(), "123".into()));
let response2 = await!(client.call(context::current(), "abc".into()));
let response1 = client.call(context::current(), "123".into()).await;
let response2 = client.call(context::current(), "abc".into()).await;
Ok::<_, io::Error>((response1, response2))
};

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.16.0"
version = "0.18.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -19,17 +19,17 @@ serde1 = ["rpc/serde1", "serde", "serde/derive"]
travis-ci = { repository = "google/tarpc" }
[dependencies]
futures-preview = { version = "0.3.0-alpha.14", features = ["compat"] }
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
log = "0.4"
serde = { optional = true, version = "1.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.4" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
[dev-dependencies]
bincode = "1"
bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0"
bincode-transport = { package = "tarpc-bincode-transport", version = "0.5", path = "../bincode-transport" }
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
env_logger = "0.6"
libtest = "0.0.1"
tokio = "0.1"

View File

@@ -6,8 +6,6 @@
#![feature(
arbitrary_self_types,
futures_api,
await_macro,
async_await,
existential_type,
proc_macro_hygiene
@@ -100,7 +98,7 @@ impl publisher::Service for Publisher {
// Ignore failing subscribers. In a real pubsub,
// you'd want to continually retry until subscribers
// ack.
let _ = await!(client.receive(context::current(), message.clone()));
let _ = client.receive(context::current(), message.clone()).await;
}
}
@@ -115,8 +113,8 @@ impl publisher::Service for Publisher {
id: u32,
addr: SocketAddr,
) -> io::Result<()> {
let conn = await!(bincode_transport::connect(&addr))?;
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn))?;
let conn = bincode_transport::connect(&addr).await?;
let subscriber = subscriber::new_stub(client::Config::default(), conn).await?;
println!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
Ok(())
@@ -154,27 +152,34 @@ async fn run() -> io::Result<()> {
.compat(),
);
let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
let subscriber2 = await!(Subscriber::listen(1, server::Config::default()))?;
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
let subscriber2 = Subscriber::listen(1, server::Config::default()).await?;
let publisher_conn = bincode_transport::connect(&publisher_addr);
let publisher_conn = await!(publisher_conn)?;
let mut publisher = await!(publisher::new_stub(
client::Config::default(),
publisher_conn
))?;
let publisher_conn = publisher_conn.await?;
let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?;
if let Err(e) = await!(publisher.subscribe(context::current(), 0, subscriber1))? {
if let Err(e) = publisher
.subscribe(context::current(), 0, subscriber1)
.await?
{
eprintln!("Couldn't subscribe subscriber 0: {}", e);
}
if let Err(e) = await!(publisher.subscribe(context::current(), 1, subscriber2))? {
if let Err(e) = publisher
.subscribe(context::current(), 1, subscriber2)
.await?
{
eprintln!("Couldn't subscribe subscriber 1: {}", e);
}
println!("Broadcasting...");
await!(publisher.broadcast(context::current(), "hello to all".to_string()))?;
await!(publisher.unsubscribe(context::current(), 1))?;
await!(publisher.broadcast(context::current(), "hi again".to_string()))?;
publisher
.broadcast(context::current(), "hello to all".to_string())
.await?;
publisher.unsubscribe(context::current(), 1).await?;
publisher
.broadcast(context::current(), "hi again".to_string())
.await?;
Ok(())
}

View File

@@ -4,13 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(
futures_api,
arbitrary_self_types,
await_macro,
async_await,
proc_macro_hygiene
)]
#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
use futures::{
compat::Executor01CompatExt,
@@ -64,17 +58,17 @@ async fn run() -> io::Result<()> {
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&addr))?;
let transport = bincode_transport::connect(&addr).await?;
// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = await!(new_stub(client::Config::default(), transport))?;
let mut client = new_stub(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello);

View File

@@ -7,8 +7,6 @@
#![feature(
existential_type,
arbitrary_self_types,
futures_api,
await_macro,
async_await,
proc_macro_hygiene
)]
@@ -60,8 +58,10 @@ impl DoubleService for DoubleServer {
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
let result = await!(client.add(context::current(), x, x));
result.map_err(|e| e.to_string())
client
.add(context::current(), x, x)
.await
.map_err(|e| e.to_string())
}
double(self.add_client.clone(), x)
@@ -77,8 +77,8 @@ async fn run() -> io::Result<()> {
.respond_with(add::serve(AddServer));
tokio_executor::spawn(add_server.unit_error().boxed().compat());
let to_add_server = await!(bincode_transport::connect(&addr))?;
let add_client = await!(add::new_stub(client::Config::default(), to_add_server))?;
let to_add_server = bincode_transport::connect(&addr).await?;
let add_client = add::new_stub(client::Config::default(), to_add_server).await?;
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = double_listener.local_addr();
@@ -88,14 +88,11 @@ async fn run() -> io::Result<()> {
.respond_with(double::serve(DoubleServer { add_client }));
tokio_executor::spawn(double_server.unit_error().boxed().compat());
let to_double_server = await!(bincode_transport::connect(&addr))?;
let mut double_client = await!(double::new_stub(
client::Config::default(),
to_double_server
))?;
let to_double_server = bincode_transport::connect(&addr).await?;
let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
for i in 1..=5 {
println!("{:?}", await!(double_client.double(context::current(), i))?);
println!("{:?}", double_client.double(context::current(), i).await?);
}
Ok(())
}

View File

@@ -1,7 +1,5 @@
#![feature(
async_await,
await_macro,
futures_api,
arbitrary_self_types,
proc_macro_hygiene,
impl_trait_in_bindings
@@ -88,7 +86,7 @@ mod registry {
serve: move |cx, req: Bytes| {
async move {
let req = deserialize.clone()(req)?;
let response = await!(serve.clone()(cx, req))?;
let response = serve.clone()(cx, req).await?;
let response = serialize.clone()(response)?;
Ok(ServiceResponse { response })
}
@@ -392,8 +390,8 @@ async fn run() -> io::Result<()> {
.respond_with(registry.serve());
tokio_executor::spawn(server.unit_error().boxed().compat());
let transport = await!(bincode_transport::connect(&server_addr))?;
let channel = await!(client::new(client::Config::default(), transport))?;
let transport = bincode_transport::connect(&server_addr).await?;
let channel = client::new(client::Config::default(), transport).await?;
let write_client = new_client("WriteService".to_string(), &channel);
let mut write_client = write_service::Client::from(write_client);
@@ -401,8 +399,12 @@ async fn run() -> io::Result<()> {
let read_client = new_client("ReadService".to_string(), &channel);
let mut read_client = read_service::Client::from(read_client);
await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
let val = await!(read_client.read(context::current(), "key".to_string()))?;
write_client
.write(context::current(), "key".to_string(), "val".to_string())
.await?;
let val = read_client
.read(context::current(), "key".to_string())
.await?;
println!("{:?}", val);
Ok(())

View File

@@ -7,10 +7,7 @@
#![doc(include = "../README.md")]
#![deny(missing_docs, missing_debug_implementations)]
#![feature(async_await, external_doc)]
#![cfg_attr(
test,
feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types)
)]
#![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))]
#[doc(hidden)]
pub use futures;

View File

@@ -30,7 +30,7 @@ macro_rules! add_serde_if_enabled {
/// Rpc methods are specified, mirroring trait syntax:
///
/// ```
/// # #![feature(await_macro, pin, arbitrary_self_types, async_await, futures_api, proc_macro_hygiene)]
/// # #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
/// # fn main() {}
/// # tarpc::service! {
/// /// Say hello
@@ -222,7 +222,7 @@ macro_rules! service {
Item = $crate::Response<Response>,
SinkItem = $crate::ClientMessage<Request>> + Send + 'static,
{
Ok(Client(await!($crate::client::new(config, transport))?))
Ok(Client($crate::client::new(config, transport).await?))
}
impl<C> From<C> for Client<C>
@@ -244,7 +244,7 @@ macro_rules! service {
let request__ = Request::$fn_name { $($arg,)* };
let resp = $crate::Client::call(&mut self.0, ctx, request__);
async move {
match await!(resp)? {
match resp.await? {
Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
_ => unreachable!(),
}
@@ -328,11 +328,11 @@ mod functional_test {
.compat(),
);
let mut client = await!(new_stub(client::Config::default(), tx))?;
assert_eq!(3, await!(client.add(context::current(), 1, 2))?);
let mut client = new_stub(client::Config::default(), tx).await?;
assert_eq!(3, client.add(context::current(), 1, 2).await?);
assert_eq!(
"Hey, Tim.",
await!(client.hey(context::current(), "Tim".to_string()))?
client.hey(context::current(), "Tim".to_string()).await?
);
Ok::<_, io::Error>(())
}
@@ -357,7 +357,7 @@ mod functional_test {
.compat(),
);
let client = await!(new_stub(client::Config::default(), tx))?;
let client = new_stub(client::Config::default(), tx).await?;
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
@@ -365,9 +365,9 @@ mod functional_test {
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
assert_eq!(3, await!(req1)?);
assert_eq!(7, await!(req2)?);
assert_eq!("Hey, Tim.", await!(req3)?);
assert_eq!(3, req1.await?);
assert_eq!(7, req2.await?);
assert_eq!("Hey, Tim.", req3.await?);
Ok::<_, io::Error>(())
}
.map_err(|e| panic!("test failed: {}", e));

View File

@@ -8,9 +8,6 @@
test,
arbitrary_self_types,
integer_atomics,
futures_api,
generators,
await_macro,
async_await,
proc_macro_hygiene
)]
@@ -57,8 +54,8 @@ async fn bench() -> io::Result<()> {
.compat(),
);
let conn = await!(bincode_transport::connect(&addr))?;
let mut client = await!(ack::new_stub(client::Config::default(), conn))?;
let conn = bincode_transport::connect(&addr).await?;
let mut client = ack::new_stub(client::Config::default(), conn).await?;
let total = 10_000usize;
let mut successful = 0u32;
@@ -66,7 +63,7 @@ async fn bench() -> io::Result<()> {
let mut durations = vec![];
for _ in 1..=total {
let now = Instant::now();
let response = await!(client.ack(context::current()));
let response = client.ack(context::current()).await;
let elapsed = now.elapsed();
match response {