diff --git a/README.md b/README.md index dc52e99..d639f63 100644 --- a/README.md +++ b/README.md @@ -36,12 +36,9 @@ Add to your `Cargo.toml` dependencies: tarpc = "0.18.0" ``` -The `service!` macro expands to a collection of items that form an rpc service. -In the above example, the macro is called within the `hello_service` module. -This module will contain a `Client` stub and `Service` trait. These generated -types make it easy and ergonomic to write servers without dealing with -serialization directly. Simply implement one of the generated traits, and you're -off to the races! +The `tarpc::service` attribute expands to a collection of items that form an rpc service. +These generated types make it easy and ergonomic to write servers with less boilerplate. +Simply implement the generated service trait, and you're off to the races! ## Example @@ -77,13 +74,13 @@ use std::io; // This is the service definition. It looks a lot like a trait definition. // It defines one RPC, hello, which takes one arg, name, and returns a String. #[tarpc::service] -trait Service { +trait World { /// Returns a greeting for name. async fn hello(name: String) -> String; } ``` -This service definition generates a trait called `Service`. Next we need to +This service definition generates a trait called `World`. Next we need to implement it for our Server struct. ```rust @@ -104,17 +101,17 @@ implement it for our Server struct. # // This is the service definition. It looks a lot like a trait definition. # // It defines one RPC, hello, which takes one arg, name, and returns a String. # #[tarpc::service] -# trait Service { +# trait World { # /// Returns a greeting for name. # async fn hello(name: String) -> String; # } # -// This is the type that implements the generated Service trait. It is the business logic +// This is the type that implements the generated World trait. It is the business logic // and is used to start the server. #[derive(Clone)] struct HelloServer; -impl Service for HelloServer { +impl World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -150,17 +147,17 @@ that uses bincode over TCP. # // This is the service definition. It looks a lot like a trait definition. # // It defines one RPC, hello, which takes one arg, name, and returns a String. # #[tarpc::service] -# trait Service { +# trait World { # /// Returns a greeting for name. # async fn hello(name: String) -> String; # } # -# // This is the type that implements the generated Service trait. It is the business logic +# // This is the type that implements the generated World trait. It is the business logic # // and is used to start the server. # #[derive(Clone)] # struct HelloServer; # -# impl Service for HelloServer { +# impl World for HelloServer { # // Each defined rpc generates two items in the trait, a fn that serves the RPC, and # // an associated type representing the future output by the fn. # @@ -179,19 +176,19 @@ async fn main() -> io::Result<()> { // incoming() takes a stream of transports such as would be returned by // TcpListener::incoming (but a stream instead of an iterator). .incoming(stream::once(future::ready(server_transport))) - // serve is generated by the service! macro. It takes as input any type implementing - // the generated Service trait. - .respond_with(serve(HelloServer)); + // serve_world is generated by the macro. It takes as input any type implementing + // the generated World trait. + .respond_with(serve_world(HelloServer)); let _ = runtime::spawn(server); - // new_stub is generated by the service! macro. Like Server, it takes a config and any + // world_stub is generated by the macro. Like Server, it takes a config and any // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = new_stub(client::Config::default(), client_transport).await?; + // by the service attribute. + let mut client = world_stub(client::Config::default(), client_transport).await?; - // The client has an RPC method for each RPC defined in service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context + // The client has an RPC method for each RPC defined in the annotated trait. It takes the same + // args as defined, with the addition of a Context, which is always the first arg. The Context // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), "Stim".to_string()).await?; diff --git a/example-service/src/client.rs b/example-service/src/client.rs index 1382fcc..1d60fc9 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -44,13 +44,12 @@ async fn main() -> io::Result<()> { let transport = json_transport::connect(&server_addr).await?; - // new_stub is generated by the service! macro. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = service::new_stub(client::Config::default(), transport).await?; + // world_stub is generated by the service attribute. Like Server, it takes a config and any + // Transport as input, and returns a Client, also generated by the attribute. + let mut client = service::world_stub(client::Config::default(), transport).await?; - // The client has an RPC method for each RPC defined in service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context + // The client has an RPC method for each RPC defined in the annotated trait. It takes the same + // args as defined, with the addition of a Context, which is always the first arg. The Context // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), name).await?; diff --git a/example-service/src/lib.rs b/example-service/src/lib.rs index 1bda58a..ee9f48f 100644 --- a/example-service/src/lib.rs +++ b/example-service/src/lib.rs @@ -9,7 +9,7 @@ /// This is the service definition. It looks a lot like a trait definition. /// It defines one RPC, hello, which takes one arg, name, and returns a String. #[tarpc::service] -pub trait Service { +pub trait World { /// Returns a greeting for name. async fn hello(name: String) -> String; } diff --git a/example-service/src/server.rs b/example-service/src/server.rs index 5a33824..542505c 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -17,12 +17,12 @@ use tarpc::{ server::{self, Channel, Handler}, }; -// This is the type that implements the generated Service trait. It is the business logic +// This is the type that implements the generated World trait. It is the business logic // and is used to start the server. #[derive(Clone)] struct HelloServer(SocketAddr); -impl service::Service for HelloServer { +impl service::World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -70,11 +70,11 @@ async fn main() -> io::Result<()> { .map(server::BaseChannel::with_defaults) // Limit channels to 1 per IP. .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) - // serve is generated by the service! macro. It takes as input any type implementing - // the generated Service trait. + // serve is generated by the service attribute. It takes as input any type implementing + // the generated World trait. .map(|channel| { let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap()); - channel.respond_with(service::serve(server)) + channel.respond_with(service::serve_world(server)) }) // Max 10 channels. .buffer_unordered(10) diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 60bcd46..0ec0d9e 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -196,6 +196,19 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { let service_name_repeated2 = service_name_repeated.clone(); let client_ident = Ident::new(&format!("{}Client", ident), ident.span()); + let request_ident = Ident::new(&format!("{}Request", ident), ident.span()); + let request_ident_repeated = std::iter::repeat(request_ident.clone()); + let request_ident_repeated2 = request_ident_repeated.clone(); + let response_ident = Ident::new(&format!("{}Response", ident), ident.span()); + let response_ident_repeated = std::iter::repeat(response_ident.clone()); + let response_ident_repeated2 = response_ident_repeated.clone(); + let response_fut_name = format!("{}ResponseFut", ident); + let response_fut_ident = Ident::new(&response_fut_name, ident.span()); + let response_fut_ident_repeated = std::iter::repeat(response_fut_ident.clone()); + let response_fut_ident_repeated2 = response_fut_ident_repeated.clone(); + let snake_ident = camel_to_snake(&ident.to_string()); + let serve_ident = Ident::new(&format!("serve_{}", snake_ident), ident.span()); + let stub_ident = Ident::new(&format!("{}_stub", snake_ident), ident.span()); #[cfg(feature = "serde1")] let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]); @@ -211,41 +224,41 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { /// The request sent over the wire from the client to the server. #[derive(Debug)] #derive_serialize - #vis enum Request { + #vis enum #request_ident { #( #camel_case_idents{ #args } ),* } /// The response sent over the wire from the server to the client. #[derive(Debug)] #derive_serialize - #vis enum Response { + #vis enum #response_ident { #( #camel_case_idents(#outputs) ),* } - /// A future resolving to a server [`Response`]. - #vis enum ResponseFut { + /// A future resolving to a server response. + #vis enum #response_fut_ident { #( #camel_case_idents(::#future_types) ),* } - impl std::fmt::Debug for ResponseFut { + impl std::fmt::Debug for #response_fut_ident { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - fmt.debug_struct("ResponseFut").finish() + fmt.debug_struct(#response_fut_name).finish() } } - impl std::future::Future for ResponseFut { - type Output = std::io::Result; + impl std::future::Future for #response_fut_ident { + type Output = std::io::Result<#response_ident>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) - -> std::task::Poll> + -> std::task::Poll> { unsafe { match std::pin::Pin::get_unchecked_mut(self) { #( - ResponseFut::#camel_case_idents(resp) => + #response_fut_ident_repeated::#camel_case_idents(resp) => std::pin::Pin::new_unchecked(resp) .poll(cx) - .map(Response::#camel_case_idents2) + .map(#response_ident_repeated::#camel_case_idents2) .map(Ok), )* } @@ -254,13 +267,13 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { } /// Returns a serving function to use with tarpc::server::Server. - #vis fn serve(service: S) - -> impl FnOnce(tarpc::context::Context, Request) -> ResponseFut + Send + 'static + Clone { + #vis fn #serve_ident(service: S) + -> impl FnOnce(tarpc::context::Context, #request_ident) -> #response_fut_ident + Send + 'static + Clone { move |ctx, req| { match req { #( - Request::#camel_case_idents{ #arg_vars } => { - ResponseFut::#camel_case_idents2( + #request_ident_repeated::#camel_case_idents{ #arg_vars } => { + #response_fut_ident_repeated2::#camel_case_idents2( #service_name_repeated2::#method_names( service.clone(), ctx, #arg_vars2)) } @@ -272,19 +285,19 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. - #vis struct #client_ident>(C); + #vis struct #client_ident>(C); /// Returns a new client stub that sends requests over the given transport. - #vis async fn new_stub(config: tarpc::client::Config, transport: T) + #vis async fn #stub_ident(config: tarpc::client::Config, transport: T) -> std::io::Result<#client_ident> where - T: tarpc::Transport, tarpc::Response> + Send + 'static, + T: tarpc::Transport, tarpc::Response<#response_ident>> + Send + 'static, { Ok(#client_ident(tarpc::client::new(config, transport).await?)) } impl From for #client_ident - where for <'a> C: tarpc::Client<'a, Request, Response = Response> + where for <'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident> { fn from(client: C) -> Self { #client_ident(client) @@ -292,18 +305,18 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { } impl #client_ident - where for<'a> C: tarpc::Client<'a, Request, Response = Response> + where for<'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident> { #( #[allow(unused)] #( #method_attrs )* pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args) -> impl std::future::Future> + '_ { - let request = Request::#camel_case_idents { #arg_vars }; + let request = #request_ident_repeated2::#camel_case_idents { #arg_vars }; let resp = tarpc::Client::call(&mut self.0, ctx, request); async move { match resp.await? { - Response::#camel_case_idents2(msg) => std::result::Result::Ok(msg), + #response_ident_repeated2::#camel_case_idents2(msg) => std::result::Result::Ok(msg), _ => unreachable!(), } } @@ -334,6 +347,28 @@ fn snake_to_camel(ident_str: &str) -> String { camel_ty } +// Really basic camel to snake that assumes capitals are always the start of a new segment. +fn camel_to_snake(ident_str: &str) -> String { + let mut snake = String::new(); + let mut chars = ident_str.chars(); + if let Some(c) = chars.next() { + snake.extend(c.to_lowercase()); + } + + while let Some(c) = chars.next() { + if c.is_uppercase() { + // New word + snake.push('_'); + snake.extend(c.to_lowercase()); + } else { + // Same word + snake.push(c) + } + } + + snake +} + #[test] fn snake_to_camel_basic() { assert_eq!(snake_to_camel("abc_def"), "AbcDef"); @@ -358,3 +393,8 @@ fn snake_to_camel_underscore_consecutive() { fn snake_to_camel_capital_in_middle() { assert_eq!(snake_to_camel("aBc_dEf"), "AbcDef"); } + +#[test] +fn camel_to_snake_basic() { + assert_eq!(camel_to_snake("AbcDef"), "abc_def"); +} diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 6a4b11a..7e9efe3 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -28,9 +28,9 @@ assert_matches = "1.0" bincode = "1.0" bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } bytes = { version = "0.4", features = ["serde"] } +env_logger = "0.6" futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } humantime = "1.0" -env_logger = "0.6" runtime = "0.3.0-alpha.6" runtime-tokio = "0.3.0-alpha.5" tokio-tcp = "0.1" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 8c57b9b..fdfbeba 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -7,7 +7,6 @@ #![feature(async_await, existential_type)] use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, Future, @@ -26,20 +25,17 @@ use std::{ }; pub mod subscriber { - pub use ServiceClient as Client; - #[tarpc::service] - pub trait Service { + pub trait Subscriber { async fn receive(message: String); } } pub mod publisher { use std::net::SocketAddr; - pub use ServiceClient as Client; #[tarpc::service] - pub trait Service { + pub trait Publisher { async fn broadcast(message: String); async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>; async fn unsubscribe(id: u32); @@ -51,7 +47,7 @@ struct Subscriber { id: u32, } -impl subscriber::Service for Subscriber { +impl subscriber::Subscriber for Subscriber { type ReceiveFut = Ready<()>; fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut { @@ -69,7 +65,7 @@ impl Subscriber { server::new(config) .incoming(incoming) .take(1) - .respond_with(subscriber::serve(Subscriber { id })), + .respond_with(subscriber::serve_subscriber(Subscriber { id })), ); Ok(addr) } @@ -77,7 +73,7 @@ impl Subscriber { #[derive(Clone, Debug)] struct Publisher { - clients: Arc>>, + clients: Arc>>, } impl Publisher { @@ -88,11 +84,14 @@ impl Publisher { } } -impl publisher::Service for Publisher { +impl publisher::Publisher for Publisher { existential type BroadcastFut: Future; fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut { - async fn broadcast(clients: Arc>>, message: String) { + async fn broadcast( + clients: Arc>>, + message: String, + ) { let mut clients = clients.lock().unwrap().clone(); for client in clients.values_mut() { // Ignore failing subscribers. In a real pubsub, @@ -109,12 +108,13 @@ impl publisher::Service for Publisher { fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut { async fn subscribe( - clients: Arc>>, + clients: Arc>>, id: u32, addr: SocketAddr, ) -> io::Result<()> { let conn = bincode_transport::connect(&addr).await?; - let subscriber = subscriber::new_stub(client::Config::default(), conn).await?; + let subscriber = + subscriber::subscriber_stub(client::Config::default(), conn).await?; eprintln!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); Ok(()) @@ -149,7 +149,7 @@ async fn main() -> io::Result<()> { transport .take(1) .map(server::BaseChannel::with_defaults) - .respond_with(publisher::serve(Publisher::new())), + .respond_with(publisher::serve_publisher(Publisher::new())), ); let subscriber1 = Subscriber::listen(0, server::Config::default()).await?; @@ -157,7 +157,8 @@ async fn main() -> io::Result<()> { let publisher_conn = bincode_transport::connect(&publisher_addr); let publisher_conn = publisher_conn.await?; - let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?; + let mut publisher = + publisher::publisher_stub(client::Config::default(), publisher_conn).await?; if let Err(e) = publisher .subscribe(context::current(), 0, subscriber1) diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 2aa2204..1f5d17b 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -16,20 +16,19 @@ use rpc::{ }; use std::io; -// This is the service definition. It looks a lot like a trait definition. -// It defines one RPC, hello, which takes one arg, name, and returns a String. - +/// This is the service definition. It looks a lot like a trait definition. +/// It defines one RPC, hello, which takes one arg, name, and returns a String. #[tarpc::service] -pub trait Service { +pub trait World { async fn hello(name: String) -> String; } -// This is the type that implements the generated Service trait. It is the business logic -// and is used to start the server. +/// This is the type that implements the generated World trait. It is the business logic +/// and is used to start the server. #[derive(Clone)] struct HelloServer; -impl Service for HelloServer { +impl World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -56,22 +55,21 @@ async fn main() -> io::Result<()> { // BaseChannel is the most basic channel, simply wrapping a transport with no added // functionality. BaseChannel::with_defaults(client) - // serve is generated by the tarpc::service! macro. It takes as input any type implementing - // the generated Service trait. - .respond_with(serve(HelloServer)) + // serve_world is generated by the tarpc::service attribute. It takes as input any type + // implementing the generated World trait. + .respond_with(serve_world(HelloServer)) .await; }; let _ = runtime::spawn(server); let transport = bincode_transport::connect(&addr).await?; - // new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = new_stub(client::Config::default(), transport).await?; + // world_stub is generated by the tarpc::service attribute. Like Server, it takes a config and + // any Transport as input, and returns a Client, also generated by the attribute. + let mut client = world_stub(client::Config::default(), transport).await?; - // The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context + // The client has an RPC method for each RPC defined in the annotated trait. It takes the same + // args as defined, with the addition of a Context, which is always the first arg. The Context // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), "Stim".to_string()).await?; diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 90b0b96..09cc166 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -6,7 +6,7 @@ #![feature(existential_type, async_await)] -use crate::{add::Service as AddService, double::Service as DoubleService}; +use crate::{add::Add as AddService, double::Double as DoubleService}; use futures::{ future::{self, Ready}, prelude::*, @@ -18,20 +18,16 @@ use rpc::{ use std::io; pub mod add { - pub use ServiceClient as Client; - #[tarpc::service] - pub trait Service { + pub trait Add { /// Add two ints together. async fn add(x: i32, y: i32) -> i32; } } pub mod double { - pub use ServiceClient as Client; - #[tarpc::service] - pub trait Service { + pub trait Double { /// 2 * x async fn double(x: i32) -> Result; } @@ -50,14 +46,14 @@ impl AddService for AddServer { #[derive(Clone)] struct DoubleServer { - add_client: add::Client, + add_client: add::AddClient, } impl DoubleService for DoubleServer { existential type DoubleFut: Future> + Send; fn double(self, _: context::Context, x: i32) -> Self::DoubleFut { - async fn double(mut client: add::Client, x: i32) -> Result { + async fn double(mut client: add::AddClient, x: i32) -> Result { client .add(context::current(), x, x) .await @@ -78,11 +74,11 @@ async fn main() -> io::Result<()> { let add_server = Server::default() .incoming(add_listener) .take(1) - .respond_with(add::serve(AddServer)); + .respond_with(add::serve_add(AddServer)); let _ = runtime::spawn(add_server); let to_add_server = bincode_transport::connect(&addr).await?; - let add_client = add::new_stub(client::Config::default(), to_add_server).await?; + let add_client = add::add_stub(client::Config::default(), to_add_server).await?; let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); @@ -90,11 +86,12 @@ async fn main() -> io::Result<()> { let double_server = rpc::Server::default() .incoming(double_listener) .take(1) - .respond_with(double::serve(DoubleServer { add_client })); + .respond_with(double::serve_double(DoubleServer { add_client })); let _ = runtime::spawn(double_server); let to_double_server = bincode_transport::connect(&addr).await?; - let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?; + let mut double_client = + double::double_stub(client::Config::default(), to_double_server).await?; for i in 1..=5 { eprintln!("{:?}", double_client.double(context::current(), i).await?); diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index 4ac5ef7..b9913d3 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -1,13 +1,20 @@ #![feature(async_await)] +#[cfg(not(feature = "serde1"))] +use std::rc::Rc; + use assert_matches::assert_matches; use futures::{ future::{ready, Ready}, prelude::*, }; -#[cfg(feature = "serde1")] + use std::io; -use tarpc::{client, context, server::Handler, transport::channel}; +use tarpc::{ + client, context, + server::{self, BaseChannel, Channel, Handler}, + transport::channel, +}; #[tarpc_plugins::service] trait Service { @@ -33,25 +40,24 @@ impl Service for Server { } #[runtime::test(runtime_tokio::TokioCurrentThread)] -async fn sequential() { +async fn sequential() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); + let _ = runtime::spawn( - tarpc::Server::default() - .incoming(stream::once(ready(rx))) - .respond_with(serve(Server)), + BaseChannel::new(server::Config::default(), rx) + .respond_with(serve_service(Server)) ); - let mut client = new_stub(client::Config::default(), tx).await.unwrap(); - assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap()); - assert_eq!( - "Hey, Tim.", - client - .hey(context::current(), "Tim".to_string()) - .await - .unwrap() - ); + let mut client = service_stub(client::Config::default(), tx).await?; + + assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); + assert_matches!( + client.hey(context::current(), "Tim".into()).await, + Ok(ref s) if s == "Hey, Tim."); + + Ok(()) } #[cfg(feature = "serde1")] @@ -64,11 +70,11 @@ async fn serde() -> io::Result<()> { let _ = runtime::spawn( tarpc::Server::default() .incoming(transport.take(1).filter_map(|r| async { r.ok() })) - .respond_with(serve(Server)), + .respond_with(serve_service(Server)), ); let transport = bincode_transport::connect(&addr).await?; - let mut client = new_stub(client::Config::default(), transport).await?; + let mut client = service_stub(client::Config::default(), transport).await?; assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); assert_matches!( @@ -80,25 +86,30 @@ async fn serde() -> io::Result<()> { } #[runtime::test(runtime_tokio::TokioCurrentThread)] -async fn concurrent() { +async fn concurrent() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); let _ = runtime::spawn( rpc::Server::default() .incoming(stream::once(ready(rx))) - .respond_with(serve(Server)), + .respond_with(serve_service(Server)), ); - let client = new_stub(client::Config::default(), tx).await.unwrap(); + let client = service_stub(client::Config::default(), tx).await?; + let mut c = client.clone(); let req1 = c.add(context::current(), 1, 2); + let mut c = client.clone(); let req2 = c.add(context::current(), 3, 4); + let mut c = client.clone(); let req3 = c.hey(context::current(), "Tim".to_string()); - assert_eq!(3, req1.await.unwrap()); - assert_eq!(7, req2.await.unwrap()); - assert_eq!("Hey, Tim.", req3.await.unwrap()); + assert_matches!(req1.await, Ok(3)); + assert_matches!(req2.await, Ok(7)); + assert_matches!(req3.await, Ok(ref s) if s == "Hey, Tim."); + + Ok(()) }