mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6745cee72c | ||
|
|
31abea18b3 | ||
|
|
593ac135ce |
@@ -33,7 +33,7 @@ arguments to tarpc fns.
|
||||
Add to your `Cargo.toml` dependencies:
|
||||
|
||||
```toml
|
||||
tarpc = "0.17.0"
|
||||
tarpc = "0.18.0"
|
||||
```
|
||||
|
||||
The `service!` macro expands to a collection of items that form an
|
||||
@@ -48,7 +48,7 @@ races!
|
||||
Here's a small service.
|
||||
|
||||
```rust
|
||||
#![feature(pin, arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
|
||||
#![feature(arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]
|
||||
|
||||
|
||||
use futures::{
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-bincode-transport"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -14,10 +14,10 @@ description = "A bincode-based transport for tarpc services."
|
||||
|
||||
[dependencies]
|
||||
bincode = "1"
|
||||
futures-preview = { version = "0.3.0-alpha.15", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
|
||||
futures_legacy = { version = "0.1", package = "futures" }
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
rpc = { package = "tarpc-lib", version = "0.5", path = "../rpc", features = ["serde1"] }
|
||||
rpc = { package = "tarpc-lib", version = "0.6", path = "../rpc", features = ["serde1"] }
|
||||
serde = "1.0"
|
||||
tokio-io = "0.1"
|
||||
async-bincode = "0.4"
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
//! A TCP [`Transport`] that serializes as bincode.
|
||||
|
||||
#![feature(arbitrary_self_types, await_macro, async_await)]
|
||||
#![feature(arbitrary_self_types, async_await)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
use async_bincode::{AsyncBincodeStream, AsyncDestination};
|
||||
@@ -131,7 +131,7 @@ where
|
||||
Item: for<'de> Deserialize<'de>,
|
||||
SinkItem: Serialize,
|
||||
{
|
||||
Ok(new(await!(TcpStream::connect(addr).compat())?))
|
||||
Ok(new(TcpStream::connect(addr).compat().await?))
|
||||
}
|
||||
|
||||
/// Listens on `addr`, wrapping accepted connections in bincode transports.
|
||||
|
||||
@@ -6,13 +6,7 @@
|
||||
|
||||
//! Tests client/server control flow.
|
||||
|
||||
#![feature(
|
||||
test,
|
||||
integer_atomics,
|
||||
generators,
|
||||
await_macro,
|
||||
async_await
|
||||
)]
|
||||
#![feature(test, integer_atomics, async_await)]
|
||||
|
||||
use futures::{compat::Executor01CompatExt, prelude::*};
|
||||
use libtest::stats::Stats;
|
||||
@@ -39,8 +33,8 @@ async fn bench() -> io::Result<()> {
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
||||
let client = &mut await!(client::new::<u32, u32, _>(client::Config::default(), conn))?;
|
||||
let conn = tarpc_bincode_transport::connect(&addr).await?;
|
||||
let client = &mut client::new::<u32, u32, _>(client::Config::default(), conn).await?;
|
||||
|
||||
let total = 10_000usize;
|
||||
let mut successful = 0u32;
|
||||
@@ -48,7 +42,7 @@ async fn bench() -> io::Result<()> {
|
||||
let mut durations = vec![];
|
||||
for _ in 1..=total {
|
||||
let now = Instant::now();
|
||||
let response = await!(client.call(context::current(), 0u32));
|
||||
let response = client.call(context::current(), 0u32).await;
|
||||
let elapsed = now.elapsed();
|
||||
|
||||
match response {
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
//! Tests client/server control flow.
|
||||
|
||||
#![feature(generators, await_macro, async_await)]
|
||||
#![feature(async_await)]
|
||||
|
||||
use futures::{
|
||||
compat::{Executor01CompatExt, Future01CompatExt},
|
||||
@@ -66,7 +66,7 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
let wait = Delay::new(Instant::now() + delay).compat();
|
||||
async move {
|
||||
await!(wait).unwrap();
|
||||
wait.await.unwrap();
|
||||
Ok(request)
|
||||
}
|
||||
});
|
||||
@@ -75,11 +75,8 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
||||
let client = await!(client::new::<String, String, _>(
|
||||
client::Config::default(),
|
||||
conn
|
||||
))?;
|
||||
let conn = tarpc_bincode_transport::connect(&addr).await?;
|
||||
let client = client::new::<String, String, _>(client::Config::default(), conn).await?;
|
||||
|
||||
// Proxy service
|
||||
let listener = tarpc_bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
@@ -99,7 +96,7 @@ async fn run() -> io::Result<()> {
|
||||
let handler = channel.respond_with(move |ctx, request| {
|
||||
trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr);
|
||||
let mut client = client.clone();
|
||||
async move { await!(client.call(ctx, request)) }
|
||||
async move { client.call(ctx, request).await }
|
||||
});
|
||||
tokio_executor::spawn(handler.unit_error().boxed().compat());
|
||||
}
|
||||
@@ -111,10 +108,9 @@ async fn run() -> io::Result<()> {
|
||||
config.max_in_flight_requests = 10;
|
||||
config.pending_request_buffer = 10;
|
||||
|
||||
let client = await!(client::new::<String, String, _>(
|
||||
config,
|
||||
await!(tarpc_bincode_transport::connect(&addr))?
|
||||
))?;
|
||||
let client =
|
||||
client::new::<String, String, _>(config, tarpc_bincode_transport::connect(&addr).await?)
|
||||
.await?;
|
||||
|
||||
// Make 3 speculative requests, returning only the quickest.
|
||||
let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect();
|
||||
@@ -126,10 +122,11 @@ async fn run() -> io::Result<()> {
|
||||
let response = client.call(ctx, "ping".into());
|
||||
requests.push(response.map(move |r| (trace_id, r)));
|
||||
}
|
||||
let (fastest_response, _) = await!(requests
|
||||
let (fastest_response, _) = requests
|
||||
.into_iter()
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.into_future());
|
||||
.into_future()
|
||||
.await;
|
||||
let (trace_id, resp) = fastest_response.unwrap();
|
||||
info!("[{}] fastest_response = {:?}", trace_id, resp);
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
//! Tests client/server control flow.
|
||||
|
||||
#![feature(generators, await_macro, async_await)]
|
||||
#![feature(async_await)]
|
||||
|
||||
use futures::{
|
||||
compat::{Executor01CompatExt, Future01CompatExt},
|
||||
@@ -65,7 +65,7 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
let sleep = Delay::new(Instant::now() + delay).compat();
|
||||
async {
|
||||
await!(sleep).unwrap();
|
||||
sleep.await.unwrap();
|
||||
Ok(request)
|
||||
}
|
||||
});
|
||||
@@ -78,8 +78,8 @@ async fn run() -> io::Result<()> {
|
||||
config.max_in_flight_requests = 10;
|
||||
config.pending_request_buffer = 10;
|
||||
|
||||
let conn = await!(tarpc_bincode_transport::connect(&addr))?;
|
||||
let client = await!(client::new::<String, String, _>(config, conn))?;
|
||||
let conn = tarpc_bincode_transport::connect(&addr).await?;
|
||||
let client = client::new::<String, String, _>(config, conn).await?;
|
||||
|
||||
let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
|
||||
for mut client in clients {
|
||||
@@ -88,7 +88,7 @@ async fn run() -> io::Result<()> {
|
||||
async move {
|
||||
let trace_id = *ctx.trace_id();
|
||||
let response = client.call(ctx, "ping".into());
|
||||
match await!(response) {
|
||||
match response.await {
|
||||
Ok(response) => info!("[{}] response: {}", trace_id, response),
|
||||
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-example-service"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
@@ -13,11 +13,11 @@ readme = "../README.md"
|
||||
description = "An example server built on tarpc."
|
||||
|
||||
[dependencies]
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.6", path = "../bincode-transport" }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
|
||||
clap = "2.0"
|
||||
futures-preview = { version = "0.3.0-alpha.15", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
|
||||
serde = { version = "1.0" }
|
||||
tarpc = { version = "0.17", path = "../tarpc", features = ["serde1"] }
|
||||
tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] }
|
||||
tokio = "0.1"
|
||||
tokio-executor = "0.1"
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(arbitrary_self_types, await_macro, async_await)]
|
||||
#![feature(arbitrary_self_types, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{compat::Executor01CompatExt, prelude::*};
|
||||
@@ -12,17 +12,17 @@ use std::{io, net::SocketAddr};
|
||||
use tarpc::{client, context};
|
||||
|
||||
async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
|
||||
let transport = await!(bincode_transport::connect(&server_addr))?;
|
||||
let transport = bincode_transport::connect(&server_addr).await?;
|
||||
|
||||
// new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = await!(service::new_stub(client::Config::default(), transport))?;
|
||||
let mut client = service::new_stub(client::Config::default(), transport).await?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = await!(client.hello(context::current(), name))?;
|
||||
let hello = client.hello(context::current(), name).await?;
|
||||
|
||||
println!("{}", hello);
|
||||
|
||||
|
||||
@@ -4,12 +4,7 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
|
||||
|
||||
// This is the service definition. It looks a lot like a trait definition.
|
||||
// It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(arbitrary_self_types, await_macro, async_await)]
|
||||
#![feature(arbitrary_self_types, async_await)]
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{
|
||||
@@ -47,7 +47,7 @@ async fn run(server_addr: SocketAddr) -> io::Result<()> {
|
||||
// the generated Service trait.
|
||||
.respond_with(service::serve(HelloServer));
|
||||
|
||||
await!(server);
|
||||
server.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc-lib"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||
edition = '2018'
|
||||
license = "MIT"
|
||||
@@ -18,7 +18,7 @@ serde1 = ["trace/serde", "serde", "serde/derive"]
|
||||
|
||||
[dependencies]
|
||||
fnv = "1.0"
|
||||
futures-preview = { version = "0.3.0-alpha.15", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
@@ -28,6 +28,6 @@ trace = { package = "tarpc-trace", version = "0.2", path = "../trace" }
|
||||
serde = { optional = true, version = "1.0" }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-test-preview = { version = "0.3.0-alpha.15" }
|
||||
futures-test-preview = { version = "0.3.0-alpha.16" }
|
||||
env_logger = "0.6"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -147,5 +147,5 @@ where
|
||||
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
|
||||
});
|
||||
|
||||
Ok(await!(channel::spawn(config, transport, server_addr))?)
|
||||
Ok(channel::spawn(config, transport, server_addr).await?)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
integer_atomics,
|
||||
try_trait,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await
|
||||
)]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
|
||||
@@ -494,7 +494,10 @@ where
|
||||
},
|
||||
};
|
||||
trace!("[{}/{}] Sending response.", trace_id, peer);
|
||||
await!(response_tx.send((ctx, response)).unwrap_or_else(|_| ()));
|
||||
response_tx
|
||||
.send((ctx, response))
|
||||
.unwrap_or_else(|_| ())
|
||||
.await;
|
||||
},
|
||||
);
|
||||
let (abortable_response, abort_handle) = abortable(response);
|
||||
|
||||
@@ -121,10 +121,10 @@ mod tests {
|
||||
});
|
||||
|
||||
let responses = async {
|
||||
let mut client = await!(client::new(client::Config::default(), client_channel))?;
|
||||
let mut client = client::new(client::Config::default(), client_channel).await?;
|
||||
|
||||
let response1 = await!(client.call(context::current(), "123".into()));
|
||||
let response2 = await!(client.call(context::current(), "abc".into()));
|
||||
let response1 = client.call(context::current(), "123".into()).await;
|
||||
let response2 = client.call(context::current(), "abc".into()).await;
|
||||
|
||||
Ok::<_, io::Error>((response1, response2))
|
||||
};
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
@@ -19,17 +19,17 @@ serde1 = ["rpc/serde1", "serde", "serde/derive"]
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
futures-preview = { version = "0.3.0-alpha.15", features = ["compat"] }
|
||||
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
|
||||
log = "0.4"
|
||||
serde = { optional = true, version = "1.0" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.5" }
|
||||
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" }
|
||||
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
|
||||
|
||||
[dev-dependencies]
|
||||
bincode = "1"
|
||||
bytes = { version = "0.4", features = ["serde"] }
|
||||
humantime = "1.0"
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.6", path = "../bincode-transport" }
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
|
||||
env_logger = "0.6"
|
||||
libtest = "0.0.1"
|
||||
tokio = "0.1"
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#![feature(
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
existential_type,
|
||||
proc_macro_hygiene
|
||||
@@ -99,7 +98,7 @@ impl publisher::Service for Publisher {
|
||||
// Ignore failing subscribers. In a real pubsub,
|
||||
// you'd want to continually retry until subscribers
|
||||
// ack.
|
||||
let _ = await!(client.receive(context::current(), message.clone()));
|
||||
let _ = client.receive(context::current(), message.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,8 +113,8 @@ impl publisher::Service for Publisher {
|
||||
id: u32,
|
||||
addr: SocketAddr,
|
||||
) -> io::Result<()> {
|
||||
let conn = await!(bincode_transport::connect(&addr))?;
|
||||
let subscriber = await!(subscriber::new_stub(client::Config::default(), conn))?;
|
||||
let conn = bincode_transport::connect(&addr).await?;
|
||||
let subscriber = subscriber::new_stub(client::Config::default(), conn).await?;
|
||||
println!("Subscribing {}.", id);
|
||||
clients.lock().unwrap().insert(id, subscriber);
|
||||
Ok(())
|
||||
@@ -153,27 +152,34 @@ async fn run() -> io::Result<()> {
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?;
|
||||
let subscriber2 = await!(Subscriber::listen(1, server::Config::default()))?;
|
||||
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
|
||||
let subscriber2 = Subscriber::listen(1, server::Config::default()).await?;
|
||||
|
||||
let publisher_conn = bincode_transport::connect(&publisher_addr);
|
||||
let publisher_conn = await!(publisher_conn)?;
|
||||
let mut publisher = await!(publisher::new_stub(
|
||||
client::Config::default(),
|
||||
publisher_conn
|
||||
))?;
|
||||
let publisher_conn = publisher_conn.await?;
|
||||
let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?;
|
||||
|
||||
if let Err(e) = await!(publisher.subscribe(context::current(), 0, subscriber1))? {
|
||||
if let Err(e) = publisher
|
||||
.subscribe(context::current(), 0, subscriber1)
|
||||
.await?
|
||||
{
|
||||
eprintln!("Couldn't subscribe subscriber 0: {}", e);
|
||||
}
|
||||
if let Err(e) = await!(publisher.subscribe(context::current(), 1, subscriber2))? {
|
||||
if let Err(e) = publisher
|
||||
.subscribe(context::current(), 1, subscriber2)
|
||||
.await?
|
||||
{
|
||||
eprintln!("Couldn't subscribe subscriber 1: {}", e);
|
||||
}
|
||||
|
||||
println!("Broadcasting...");
|
||||
await!(publisher.broadcast(context::current(), "hello to all".to_string()))?;
|
||||
await!(publisher.unsubscribe(context::current(), 1))?;
|
||||
await!(publisher.broadcast(context::current(), "hi again".to_string()))?;
|
||||
publisher
|
||||
.broadcast(context::current(), "hello to all".to_string())
|
||||
.await?;
|
||||
publisher.unsubscribe(context::current(), 1).await?;
|
||||
publisher
|
||||
.broadcast(context::current(), "hi again".to_string())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -4,12 +4,7 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
#![feature(
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
#![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
|
||||
|
||||
use futures::{
|
||||
compat::Executor01CompatExt,
|
||||
@@ -63,17 +58,17 @@ async fn run() -> io::Result<()> {
|
||||
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let transport = await!(bincode_transport::connect(&addr))?;
|
||||
let transport = bincode_transport::connect(&addr).await?;
|
||||
|
||||
// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = await!(new_stub(client::Config::default(), transport))?;
|
||||
let mut client = new_stub(client::Config::default(), transport).await?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = await!(client.hello(context::current(), "Stim".to_string()))?;
|
||||
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
||||
|
||||
println!("{}", hello);
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#![feature(
|
||||
existential_type,
|
||||
arbitrary_self_types,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
@@ -59,8 +58,10 @@ impl DoubleService for DoubleServer {
|
||||
|
||||
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
|
||||
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
|
||||
let result = await!(client.add(context::current(), x, x));
|
||||
result.map_err(|e| e.to_string())
|
||||
client
|
||||
.add(context::current(), x, x)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
double(self.add_client.clone(), x)
|
||||
@@ -76,8 +77,8 @@ async fn run() -> io::Result<()> {
|
||||
.respond_with(add::serve(AddServer));
|
||||
tokio_executor::spawn(add_server.unit_error().boxed().compat());
|
||||
|
||||
let to_add_server = await!(bincode_transport::connect(&addr))?;
|
||||
let add_client = await!(add::new_stub(client::Config::default(), to_add_server))?;
|
||||
let to_add_server = bincode_transport::connect(&addr).await?;
|
||||
let add_client = add::new_stub(client::Config::default(), to_add_server).await?;
|
||||
|
||||
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
|
||||
let addr = double_listener.local_addr();
|
||||
@@ -87,14 +88,11 @@ async fn run() -> io::Result<()> {
|
||||
.respond_with(double::serve(DoubleServer { add_client }));
|
||||
tokio_executor::spawn(double_server.unit_error().boxed().compat());
|
||||
|
||||
let to_double_server = await!(bincode_transport::connect(&addr))?;
|
||||
let mut double_client = await!(double::new_stub(
|
||||
client::Config::default(),
|
||||
to_double_server
|
||||
))?;
|
||||
let to_double_server = bincode_transport::connect(&addr).await?;
|
||||
let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
|
||||
|
||||
for i in 1..=5 {
|
||||
println!("{:?}", await!(double_client.double(context::current(), i))?);
|
||||
println!("{:?}", double_client.double(context::current(), i).await?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#![feature(
|
||||
async_await,
|
||||
await_macro,
|
||||
arbitrary_self_types,
|
||||
proc_macro_hygiene,
|
||||
impl_trait_in_bindings
|
||||
@@ -87,7 +86,7 @@ mod registry {
|
||||
serve: move |cx, req: Bytes| {
|
||||
async move {
|
||||
let req = deserialize.clone()(req)?;
|
||||
let response = await!(serve.clone()(cx, req))?;
|
||||
let response = serve.clone()(cx, req).await?;
|
||||
let response = serialize.clone()(response)?;
|
||||
Ok(ServiceResponse { response })
|
||||
}
|
||||
@@ -391,8 +390,8 @@ async fn run() -> io::Result<()> {
|
||||
.respond_with(registry.serve());
|
||||
tokio_executor::spawn(server.unit_error().boxed().compat());
|
||||
|
||||
let transport = await!(bincode_transport::connect(&server_addr))?;
|
||||
let channel = await!(client::new(client::Config::default(), transport))?;
|
||||
let transport = bincode_transport::connect(&server_addr).await?;
|
||||
let channel = client::new(client::Config::default(), transport).await?;
|
||||
|
||||
let write_client = new_client("WriteService".to_string(), &channel);
|
||||
let mut write_client = write_service::Client::from(write_client);
|
||||
@@ -400,8 +399,12 @@ async fn run() -> io::Result<()> {
|
||||
let read_client = new_client("ReadService".to_string(), &channel);
|
||||
let mut read_client = read_service::Client::from(read_client);
|
||||
|
||||
await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
|
||||
let val = await!(read_client.read(context::current(), "key".to_string()))?;
|
||||
write_client
|
||||
.write(context::current(), "key".to_string(), "val".to_string())
|
||||
.await?;
|
||||
let val = read_client
|
||||
.read(context::current(), "key".to_string())
|
||||
.await?;
|
||||
println!("{:?}", val);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -7,10 +7,7 @@
|
||||
#![doc(include = "../README.md")]
|
||||
#![deny(missing_docs, missing_debug_implementations)]
|
||||
#![feature(async_await, external_doc)]
|
||||
#![cfg_attr(
|
||||
test,
|
||||
feature(await_macro, proc_macro_hygiene, arbitrary_self_types)
|
||||
)]
|
||||
#![cfg_attr(test, feature(proc_macro_hygiene, arbitrary_self_types))]
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use futures;
|
||||
|
||||
@@ -30,7 +30,7 @@ macro_rules! add_serde_if_enabled {
|
||||
/// Rpc methods are specified, mirroring trait syntax:
|
||||
///
|
||||
/// ```
|
||||
/// # #![feature(await_macro, pin, arbitrary_self_types, async_await, proc_macro_hygiene)]
|
||||
/// # #![feature(arbitrary_self_types, async_await, proc_macro_hygiene)]
|
||||
/// # fn main() {}
|
||||
/// # tarpc::service! {
|
||||
/// /// Say hello
|
||||
@@ -222,7 +222,7 @@ macro_rules! service {
|
||||
Item = $crate::Response<Response>,
|
||||
SinkItem = $crate::ClientMessage<Request>> + Send + 'static,
|
||||
{
|
||||
Ok(Client(await!($crate::client::new(config, transport))?))
|
||||
Ok(Client($crate::client::new(config, transport).await?))
|
||||
}
|
||||
|
||||
impl<C> From<C> for Client<C>
|
||||
@@ -244,7 +244,7 @@ macro_rules! service {
|
||||
let request__ = Request::$fn_name { $($arg,)* };
|
||||
let resp = $crate::Client::call(&mut self.0, ctx, request__);
|
||||
async move {
|
||||
match await!(resp)? {
|
||||
match resp.await? {
|
||||
Response::$fn_name(msg__) => ::std::result::Result::Ok(msg__),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
@@ -328,11 +328,11 @@ mod functional_test {
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let mut client = await!(new_stub(client::Config::default(), tx))?;
|
||||
assert_eq!(3, await!(client.add(context::current(), 1, 2))?);
|
||||
let mut client = new_stub(client::Config::default(), tx).await?;
|
||||
assert_eq!(3, client.add(context::current(), 1, 2).await?);
|
||||
assert_eq!(
|
||||
"Hey, Tim.",
|
||||
await!(client.hey(context::current(), "Tim".to_string()))?
|
||||
client.hey(context::current(), "Tim".to_string()).await?
|
||||
);
|
||||
Ok::<_, io::Error>(())
|
||||
}
|
||||
@@ -357,7 +357,7 @@ mod functional_test {
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let client = await!(new_stub(client::Config::default(), tx))?;
|
||||
let client = new_stub(client::Config::default(), tx).await?;
|
||||
let mut c = client.clone();
|
||||
let req1 = c.add(context::current(), 1, 2);
|
||||
let mut c = client.clone();
|
||||
@@ -365,9 +365,9 @@ mod functional_test {
|
||||
let mut c = client.clone();
|
||||
let req3 = c.hey(context::current(), "Tim".to_string());
|
||||
|
||||
assert_eq!(3, await!(req1)?);
|
||||
assert_eq!(7, await!(req2)?);
|
||||
assert_eq!("Hey, Tim.", await!(req3)?);
|
||||
assert_eq!(3, req1.await?);
|
||||
assert_eq!(7, req2.await?);
|
||||
assert_eq!("Hey, Tim.", req3.await?);
|
||||
Ok::<_, io::Error>(())
|
||||
}
|
||||
.map_err(|e| panic!("test failed: {}", e));
|
||||
|
||||
@@ -8,8 +8,6 @@
|
||||
test,
|
||||
arbitrary_self_types,
|
||||
integer_atomics,
|
||||
generators,
|
||||
await_macro,
|
||||
async_await,
|
||||
proc_macro_hygiene
|
||||
)]
|
||||
@@ -56,8 +54,8 @@ async fn bench() -> io::Result<()> {
|
||||
.compat(),
|
||||
);
|
||||
|
||||
let conn = await!(bincode_transport::connect(&addr))?;
|
||||
let mut client = await!(ack::new_stub(client::Config::default(), conn))?;
|
||||
let conn = bincode_transport::connect(&addr).await?;
|
||||
let mut client = ack::new_stub(client::Config::default(), conn).await?;
|
||||
|
||||
let total = 10_000usize;
|
||||
let mut successful = 0u32;
|
||||
@@ -65,7 +63,7 @@ async fn bench() -> io::Result<()> {
|
||||
let mut durations = vec![];
|
||||
for _ in 1..=total {
|
||||
let now = Instant::now();
|
||||
let response = await!(client.ack(context::current()));
|
||||
let response = client.ack(context::current()).await;
|
||||
let elapsed = now.elapsed();
|
||||
|
||||
match response {
|
||||
|
||||
Reference in New Issue
Block a user