diff --git a/README.md b/README.md index 18e7816..5d07d22 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ impl Service for HelloServer { } ``` -Next let's write a function to start our server. While this example uses an +Lastly let's write our `main` that will start the server. While this example uses an [in-process channel](https://docs.rs/tarpc/0.18.0/tarpc/transport/channel/struct.UnboundedChannel.html), tarpc also ships a @@ -136,7 +136,6 @@ that uses bincode over TCP. # extern crate futures; # # use futures::{ -# compat::Executor01CompatExt, # future::{self, Ready}, # prelude::*, # }; @@ -169,7 +168,8 @@ that uses bincode over TCP. # } # } # -async fn run() -> io::Result<()> { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { let (client_transport, server_transport) = tarpc::transport::channel::unbounded(); let server = server::new(server::Config::default()) @@ -180,7 +180,7 @@ async fn run() -> io::Result<()> { // the generated Service trait. .respond_with(serve(HelloServer)); - tokio::spawn(server.unit_error().boxed().compat()); + let _ = runtime::spawn(server); // new_stub is generated by the service! macro. Like Server, it takes a config and any // Transport as input, and returns a Client, also generated by the macro. @@ -198,86 +198,6 @@ async fn run() -> io::Result<()> { } ``` -Lastly, we'll call `run()` from `main`. Before running a tarpc server or client, -call `tarpc::init()` to initialize the executor tarpc uses internally to run -background tasks for the client and server. - -```rust -# #![feature(async_await, proc_macro_hygiene)] -# extern crate futures; -# -# use futures::{ -# compat::Executor01CompatExt, -# future::{self, Ready}, -# prelude::*, -# }; -# use tarpc::{ -# client, context, -# server::{self, Handler}, -# }; -# use std::io; -# -# // This is the service definition. It looks a lot like a trait definition. -# // It defines one RPC, hello, which takes one arg, name, and returns a String. -# tarpc::service! { -# /// Returns a greeting for name. -# rpc hello(name: String) -> String; -# } -# -# // This is the type that implements the generated Service trait. It is the business logic -# // and is used to start the server. -# #[derive(Clone)] -# struct HelloServer; -# -# impl Service for HelloServer { -# // Each defined rpc generates two items in the trait, a fn that serves the RPC, and -# // an associated type representing the future output by the fn. -# -# type HelloFut = Ready; -# -# fn hello(self, _: context::Context, name: String) -> Self::HelloFut { -# future::ready(format!("Hello, {}!", name)) -# } -# } -# -# async fn run() -> io::Result<()> { -# let (client_transport, server_transport) = tarpc::transport::channel::unbounded(); -# -# let server = server::new(server::Config::default()) -# // incoming() takes a stream of transports such as would be returned by -# // TcpListener::incoming (but a stream instead of an iterator). -# .incoming(stream::once(future::ready(server_transport))) -# // serve is generated by the service! macro. It takes as input any type implementing -# // the generated Service trait. -# .respond_with(serve(HelloServer)); -# -# tokio::spawn(server.unit_error().boxed().compat()); -# -# // new_stub is generated by the service! macro. Like Server, it takes a config and any -# // Transport as input, and returns a Client, also generated by the macro. -# // by the service mcro. -# let mut client = new_stub(client::Config::default(), client_transport).await?; -# -# // The client has an RPC method for each RPC defined in service!. It takes the same args -# // as defined, with the addition of a Context, which is always the first arg. The Context -# // specifies a deadline and trace information which can be helpful in debugging requests. -# let hello = client.hello(context::current(), "Stim".to_string()).await?; -# -# println!("{}", hello); -# -# Ok(()) -# } -# -fn main() { - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); - tokio::run(run() - .map_err(|e| eprintln!("Oh no: {}", e)) - .boxed() - .compat(), - ); -} - -``` ## Service Documentation diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 90549cd..e2d5296 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -16,9 +16,10 @@ description = "An example server built on tarpc." json-transport = { package = "tarpc-json-transport", version = "0.1", path = "../json-transport" } clap = "2.0" futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +runtime = "0.3.0-alpha.6" +runtime-tokio = "0.3.0-alpha.5" serde = { version = "1.0" } tarpc = { version = "0.18", path = "../tarpc", features = ["serde1"] } -tokio = "0.1" env_logger = "0.6" [lib] diff --git a/example-service/src/client.rs b/example-service/src/client.rs index 4e69e61..1382fcc 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -7,29 +7,11 @@ #![feature(async_await)] use clap::{App, Arg}; -use futures::{compat::Executor01CompatExt, prelude::*}; -use std::{io, net::SocketAddr}; +use std::io; use tarpc::{client, context}; -async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> { - let transport = json_transport::connect(&server_addr).await?; - - // new_stub is generated by the service! macro. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = service::new_stub(client::Config::default(), transport).await?; - - // The client has an RPC method for each RPC defined in service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context - // specifies a deadline and trace information which can be helpful in debugging requests. - let hello = client.hello(context::current(), name).await?; - - println!("{}", hello); - - Ok(()) -} - -fn main() { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { let flags = App::new("Hello Client") .version("0.1") .author("Tim ") @@ -53,21 +35,26 @@ fn main() { ) .get_matches(); - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); - let server_addr = flags.value_of("server_addr").unwrap(); let server_addr = server_addr .parse() .unwrap_or_else(|e| panic!(r#"--server_addr value "{}" invalid: {}"#, server_addr, e)); - let name = flags.value_of("name").unwrap(); + let name = flags.value_of("name").unwrap().into(); - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); + let transport = json_transport::connect(&server_addr).await?; - tokio::run( - run(server_addr, name.into()) - .map_err(|e| eprintln!("Oh no: {}", e)) - .boxed() - .compat(), - ); + // new_stub is generated by the service! macro. Like Server, it takes a config and any + // Transport as input, and returns a Client, also generated by the macro. + // by the service mcro. + let mut client = service::new_stub(client::Config::default(), transport).await?; + + // The client has an RPC method for each RPC defined in service!. It takes the same args + // as defined, with the addition of a Context, which is always the first arg. The Context + // specifies a deadline and trace information which can be helpful in debugging requests. + let hello = client.hello(context::current(), name).await?; + + println!("{}", hello); + + Ok(()) } diff --git a/example-service/src/server.rs b/example-service/src/server.rs index d5b30a2..5a33824 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -8,7 +8,6 @@ use clap::{App, Arg}; use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -37,30 +36,8 @@ impl service::Service for HelloServer { } } -async fn run(server_addr: SocketAddr) -> io::Result<()> { - // bincode_transport is provided by the associated crate bincode-transport. It makes it easy - // to start up a serde-powered bincode serialization strategy over TCP. - json_transport::listen(&server_addr)? - // Ignore accept errors. - .filter_map(|r| future::ready(r.ok())) - .map(server::BaseChannel::with_defaults) - // Limit channels to 1 per IP. - .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) - // serve is generated by the service! macro. It takes as input any type implementing - // the generated Service trait. - .map(|channel| { - let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap()); - channel.respond_with(service::serve(server)) - }) - // Max 10 channels. - .buffer_unordered(10) - .for_each(|_| futures::future::ready(())) - .await; - - Ok(()) -} - -fn main() { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { env_logger::init(); let flags = App::new("Hello Server") @@ -83,12 +60,26 @@ fn main() { .parse() .unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e)); - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); + let server_addr = ([0, 0, 0, 0], port).into(); - tokio::run( - run(([0, 0, 0, 0], port).into()) - .map_err(|e| eprintln!("Oh no: {}", e)) - .boxed() - .compat(), - ); + // bincode_transport is provided by the associated crate bincode-transport. It makes it easy + // to start up a serde-powered bincode serialization strategy over TCP. + json_transport::listen(&server_addr)? + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 1 per IP. + .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) + // serve is generated by the service! macro. It takes as input any type implementing + // the generated Service trait. + .map(|channel| { + let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap()); + channel.respond_with(service::serve(server)) + }) + // Max 10 channels. + .buffer_unordered(10) + .for_each(|_| futures::future::ready(())) + .await; + + Ok(()) } diff --git a/json-transport/Cargo.toml b/json-transport/Cargo.toml index 31451cb..19d6185 100644 --- a/json-transport/Cargo.toml +++ b/json-transport/Cargo.toml @@ -18,7 +18,7 @@ futures_legacy = { version = "0.1", package = "futures" } pin-utils = "0.1.0-alpha.4" serde = "1.0" serde_json = "1.0" -tokio = "0.1" +tokio = { version = "0.1", default-features = false, features = ["codec"] } tokio-io = "0.1" tokio-serde-json = "0.2" tokio-tcp = "0.1" diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index c93d940..ab680fe 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -24,6 +24,8 @@ log = "0.4" once_cell = "0.2" pin-utils = "0.1.0-alpha.4" rand = "0.7" +runtime = "0.3.0-alpha.6" +runtime-raw = "0.3.0-alpha.4" tokio-timer = "0.2" trace = { package = "tarpc-trace", version = "0.2", path = "../trace" } serde = { optional = true, version = "1.0" } @@ -31,6 +33,7 @@ serde = { optional = true, version = "1.0" } [dev-dependencies] futures-test-preview = { version = "0.3.0-alpha.17" } env_logger = "0.6" +runtime-tokio = "0.3.0-alpha.5" tokio = "0.1" tokio-executor = "0.1" assert_matches = "1.0" diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 8d1f6c6..d2d72e0 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -35,7 +35,7 @@ pub use crate::{client::Client, server::Server, transport::Transport}; use futures::{ task::{Poll, Spawn, SpawnError, SpawnExt}, - Future, + Future, FutureExt, }; use once_cell::sync::OnceCell; use std::{cell::RefCell, io, time::SystemTime}; @@ -141,7 +141,11 @@ pub fn init(spawn: impl Spawn + Clone + Send + Sync + 'static) { } pub(crate) fn spawn(future: impl Future + Send + 'static) -> Result<(), SpawnError> { - SPAWN.with(|spawn| spawn.borrow_mut().spawn(future)) + if SEED_SPAWN.get().is_some() { + SPAWN.with(|spawn| spawn.borrow_mut().spawn(future)) + } else { + runtime_raw::current_runtime().spawn_boxed(future.boxed()) + } } trait CloneSpawn: Spawn { diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index d3f1e21..e2786c3 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -82,64 +82,40 @@ mod tests { server::{Handler, Server}, transport, }; - use futures::compat::Executor01CompatExt; + use assert_matches::assert_matches; use futures::{prelude::*, stream}; use log::trace; use std::io; - #[test] - fn integration() { + #[runtime::test(runtime_tokio::Tokio)] + async fn integration() -> io::Result<()> { let _ = env_logger::try_init(); - crate::init(tokio::executor::DefaultExecutor::current().compat()); let (client_channel, server_channel) = transport::channel::unbounded(); - let server = Server::::default() - .incoming(stream::once(future::ready(server_channel))) - .respond_with(|_ctx, request| { - future::ready(request.parse::().map_err(|_| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("{:?} is not an int", request), - ) - })) - }); + crate::spawn( + Server::::default() + .incoming(stream::once(future::ready(server_channel))) + .respond_with(|_ctx, request| { + future::ready(request.parse::().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("{:?} is not an int", request), + ) + })) + }), + ) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let responses = async { - let mut client = client::new(client::Config::default(), client_channel).await?; + let mut client = client::new(client::Config::default(), client_channel).await?; - let response1 = client.call(context::current(), "123".into()).await; - let response2 = client.call(context::current(), "abc".into()).await; - - Ok::<_, io::Error>((response1, response2)) - }; - - let (response1, response2) = run_future(future::join( - server, - responses.unwrap_or_else(|e| panic!(e)), - )) - .1; + let response1 = client.call(context::current(), "123".into()).await; + let response2 = client.call(context::current(), "abc".into()).await; trace!("response1: {:?}, response2: {:?}", response1, response2); - assert!(response1.is_ok()); - assert_eq!(response1.ok().unwrap(), 123); + assert_matches!(response1, Ok(123)); + assert_matches!(response2, Err(ref e) if e.kind() == io::ErrorKind::InvalidInput); - assert!(response2.is_err()); - assert_eq!(response2.err().unwrap().kind(), io::ErrorKind::InvalidInput); - } - - fn run_future(f: F) -> F::Output - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (tx, rx) = futures::channel::oneshot::channel(); - tokio::run( - f.map(|result| tx.send(result).unwrap_or_else(|_| unreachable!())) - .boxed() - .unit_error() - .compat(), - ); - futures::executor::block_on(rx).unwrap() + Ok(()) } } diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 5ecdfd2..a2b05f4 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -21,6 +21,7 @@ travis-ci = { repository = "google/tarpc" } [dependencies] futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } log = "0.4" +runtime = "0.3.0-alpha.6" serde = { optional = true, version = "1.0" } rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" } tarpc-plugins = { path = "../plugins", version = "0.5.0" } @@ -31,7 +32,6 @@ bytes = { version = "0.4", features = ["serde"] } humantime = "1.0" bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } env_logger = "0.6" -tokio = "0.1" -tokio-executor = "0.1" +runtime-tokio = "0.3.0-alpha.5" tokio-tcp = "0.1" pin-utils = "0.1.0-alpha.4" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index d1087a4..1df5ffb 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -58,14 +58,11 @@ impl Subscriber { let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let addr = incoming.get_ref().local_addr(); - tokio_executor::spawn( + let _ = runtime::spawn( server::new(config) .incoming(incoming) .take(1) - .respond_with(subscriber::serve(Subscriber { id })) - .unit_error() - .boxed() - .compat(), + .respond_with(subscriber::serve(Subscriber { id })), ); Ok(addr) } @@ -134,19 +131,18 @@ impl publisher::Service for Publisher { } } -async fn run() -> io::Result<()> { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { env_logger::init(); + let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let publisher_addr = transport.get_ref().local_addr(); - tokio_executor::spawn( + let _ = runtime::spawn( transport .take(1) .map(server::BaseChannel::with_defaults) - .respond_with(publisher::serve(Publisher::new())) - .unit_error() - .boxed() - .compat(), + .respond_with(publisher::serve(Publisher::new())), ); let subscriber1 = Subscriber::listen(0, server::Config::default()).await?; @@ -177,10 +173,8 @@ async fn run() -> io::Result<()> { publisher .broadcast(context::current(), "hi again".to_string()) .await?; + + thread::sleep(Duration::from_millis(100)); + Ok(()) } - -fn main() { - tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat()); - thread::sleep(Duration::from_millis(100)); -} diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 2996430..5f07e0b 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -7,7 +7,6 @@ #![feature(async_await, proc_macro_hygiene)] use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -40,7 +39,8 @@ impl Service for HelloServer { } } -async fn run() -> io::Result<()> { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { // bincode_transport is provided by the associated crate bincode-transport. It makes it easy // to start up a serde-powered bincode serialization strategy over TCP. let mut transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; @@ -58,7 +58,7 @@ async fn run() -> io::Result<()> { // the generated Service trait. .respond_with(serve(HelloServer)); - tokio::spawn(server.unit_error().boxed().compat()); + let _ = runtime::spawn(server); let transport = bincode_transport::connect(&addr).await?; @@ -76,14 +76,3 @@ async fn run() -> io::Result<()> { Ok(()) } - -fn main() { - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); - - tokio::run( - run() - .map_err(|e| eprintln!("Oh no: {}", e)) - .boxed() - .compat(), - ); -} diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 071720d..0f0a8a4 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -8,7 +8,6 @@ use crate::{add::Service as AddService, double::Service as DoubleService}; use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, }; @@ -63,7 +62,10 @@ impl DoubleService for DoubleServer { } } -async fn run() -> io::Result<()> { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { + env_logger::init(); + let add_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); let addr = add_listener.get_ref().local_addr(); @@ -71,7 +73,7 @@ async fn run() -> io::Result<()> { .incoming(add_listener) .take(1) .respond_with(add::serve(AddServer)); - tokio_executor::spawn(add_server.unit_error().boxed().compat()); + let _ = runtime::spawn(add_server); let to_add_server = bincode_transport::connect(&addr).await?; let add_client = add::new_stub(client::Config::default(), to_add_server).await?; @@ -83,7 +85,7 @@ async fn run() -> io::Result<()> { .incoming(double_listener) .take(1) .respond_with(double::serve(DoubleServer { add_client })); - tokio_executor::spawn(double_server.unit_error().boxed().compat()); + let _ = runtime::spawn(double_server); let to_double_server = bincode_transport::connect(&addr).await?; let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?; @@ -93,9 +95,3 @@ async fn run() -> io::Result<()> { } Ok(()) } - -fn main() { - env_logger::init(); - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); - tokio::run(run().map_err(|e| panic!(e)).boxed().compat()); -} diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index c151fd7..4e923f0 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -232,7 +232,6 @@ mod registry { // Example use bytes::Bytes; use futures::{ - compat::Executor01CompatExt, future::{ready, Ready}, prelude::*, }; @@ -294,19 +293,6 @@ impl read_service::Service for Server { } } -trait DefaultSpawn { - fn spawn(self); -} - -impl DefaultSpawn for F -where - F: Future + Send + 'static, -{ - fn spawn(self) { - tokio_executor::spawn(self.unit_error().boxed().compat()) - } -} - struct BincodeRegistry { registry: registry::Registry, } @@ -365,7 +351,10 @@ where registry::new_client(service_name, channel, serialize, deserialize) } -async fn run() -> io::Result<()> { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() -> io::Result<()> { + env_logger::init(); + let server = Server::default(); let registry = BincodeRegistry::default() .register( @@ -384,7 +373,7 @@ async fn run() -> io::Result<()> { .incoming(listener) .take(1) .respond_with(registry.serve()); - tokio_executor::spawn(server.unit_error().boxed().compat()); + let _ = runtime::spawn(server); let transport = bincode_transport::connect(&server_addr).await?; let channel = client::new(client::Config::default(), transport).await?; @@ -405,8 +394,3 @@ async fn run() -> io::Result<()> { Ok(()) } - -fn main() { - tarpc::init(tokio::executor::DefaultExecutor::current().compat()); - tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat()); -} diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 1fde331..2856f10 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -280,13 +280,10 @@ mod syntax_test { #[cfg(test)] mod functional_test { use futures::{ - compat::Executor01CompatExt, future::{ready, Ready}, prelude::*, }; use rpc::{client, context, server::Handler, transport::channel}; - use std::io; - use tokio::runtime::current_thread; service! { rpc add(x: i32, y: i32) -> i32; @@ -310,66 +307,49 @@ mod functional_test { } } - #[test] - fn sequential() { + #[runtime::test(runtime_tokio::TokioCurrentThread)] + async fn sequential() { let _ = env_logger::try_init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - let test = async { - let (tx, rx) = channel::unbounded(); - tokio_executor::spawn( - crate::Server::default() - .incoming(stream::once(ready(rx))) - .respond_with(serve(Server)) - .unit_error() - .boxed() - .compat(), - ); + let (tx, rx) = channel::unbounded(); + let _ = runtime::spawn( + crate::Server::default() + .incoming(stream::once(ready(rx))) + .respond_with(serve(Server)), + ); - let mut client = new_stub(client::Config::default(), tx).await?; - assert_eq!(3, client.add(context::current(), 1, 2).await?); - assert_eq!( - "Hey, Tim.", - client.hey(context::current(), "Tim".to_string()).await? - ); - Ok::<_, io::Error>(()) - } - .map_err(|e| panic!(e.to_string())); - - current_thread::block_on_all(test.boxed().compat()).unwrap(); + let mut client = new_stub(client::Config::default(), tx).await.unwrap(); + assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap()); + assert_eq!( + "Hey, Tim.", + client + .hey(context::current(), "Tim".to_string()) + .await + .unwrap() + ); } - #[test] - fn concurrent() { + #[runtime::test(runtime_tokio::TokioCurrentThread)] + async fn concurrent() { let _ = env_logger::try_init(); - rpc::init(tokio::executor::DefaultExecutor::current().compat()); - let test = async { - let (tx, rx) = channel::unbounded(); - tokio_executor::spawn( - rpc::Server::default() - .incoming(stream::once(ready(rx))) - .respond_with(serve(Server)) - .unit_error() - .boxed() - .compat(), - ); + let (tx, rx) = channel::unbounded(); + let _ = runtime::spawn( + rpc::Server::default() + .incoming(stream::once(ready(rx))) + .respond_with(serve(Server)), + ); - let client = new_stub(client::Config::default(), tx).await?; - let mut c = client.clone(); - let req1 = c.add(context::current(), 1, 2); - let mut c = client.clone(); - let req2 = c.add(context::current(), 3, 4); - let mut c = client.clone(); - let req3 = c.hey(context::current(), "Tim".to_string()); + let client = new_stub(client::Config::default(), tx).await.unwrap(); + let mut c = client.clone(); + let req1 = c.add(context::current(), 1, 2); + let mut c = client.clone(); + let req2 = c.add(context::current(), 3, 4); + let mut c = client.clone(); + let req3 = c.hey(context::current(), "Tim".to_string()); - assert_eq!(3, req1.await?); - assert_eq!(7, req2.await?); - assert_eq!("Hey, Tim.", req3.await?); - Ok::<_, io::Error>(()) - } - .map_err(|e| panic!("test failed: {}", e)); - - current_thread::block_on_all(test.boxed().compat()).unwrap(); + assert_eq!(3, req1.await.unwrap()); + assert_eq!(7, req2.await.unwrap()); + assert_eq!("Hey, Tim.", req3.await.unwrap()); } }