From 5c485fe608c6e4dc209ed2e559b4932609190688 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 21 Jul 2019 23:41:33 -0700 Subject: [PATCH 1/5] Add some tests for snake to camel case conversion. --- bincode-transport/Cargo.toml | 1 - plugins/Cargo.toml | 12 +------ plugins/src/lib.rs | 66 +++++++++++++++++++++++------------- plugins/tests/service.rs | 4 +-- 4 files changed, 45 insertions(+), 38 deletions(-) diff --git a/bincode-transport/Cargo.toml b/bincode-transport/Cargo.toml index aaff2b2..418a647 100644 --- a/bincode-transport/Cargo.toml +++ b/bincode-transport/Cargo.toml @@ -13,7 +13,6 @@ readme = "../README.md" description = "A bincode-based transport for tarpc services." [dependencies] -bincode = "1" futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } futures_legacy = { version = "0.1", package = "futures" } pin-utils = "0.1.0-alpha.4" diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 0398608..c3dc776 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -19,8 +19,7 @@ serde1 = [] travis-ci = { repository = "google/tarpc" } [dependencies] -itertools = "0.8" -syn = { version = "0.15", features = ["full", "extra-traits"] } +syn = { version = "0.15", features = ["full"] } quote = "0.6" proc-macro2 = "0.4" @@ -28,15 +27,6 @@ proc-macro2 = "0.4" proc-macro = true [dev-dependencies] -bincode = "1" -bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } -bytes = { version = "0.4", features = ["serde"] } futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -humantime = "1.0" -env_logger = "0.6" serde = { version = "1.0", features = ["derive"] } tarpc = { path = "../tarpc" } -tokio = "0.1" -tokio-executor = "0.1" -tokio-tcp = "0.1" -pin-utils = "0.1.0-alpha.4" diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 6c85758..60bcd46 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -6,13 +6,11 @@ #![recursion_limit = "512"] -extern crate itertools; extern crate proc_macro; extern crate proc_macro2; extern crate quote; extern crate syn; -use itertools::Itertools; use proc_macro::TokenStream; use proc_macro2::TokenStream as TokenStream2; use quote::quote; @@ -114,6 +112,13 @@ impl Parse for RpcMethod { } } +/// Generates: +/// - service trait +/// - serve fn +/// - client stub struct +/// - new_stub client factory fn +/// - Request and Response enums +/// - ResponseFut Future #[proc_macro_attribute] pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { struct EmptyArgs; @@ -133,7 +138,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { let camel_case_fn_names: Vec = rpcs .iter() - .map(|rpc| convert_str(&rpc.ident.to_string())) + .map(|rpc| snake_to_camel(&rpc.ident.to_string())) .collect(); let ref outputs: Vec = rpcs .iter() @@ -310,33 +315,46 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { tokens.into() } -fn convert_str(ident_str: &str) -> String { +fn snake_to_camel(ident_str: &str) -> String { let mut camel_ty = String::new(); - - // Find the first non-underscore and add it capitalized. let mut chars = ident_str.chars(); - // Find the first non-underscore char, uppercase it, and append it. - // Guaranteed to succeed because all idents must have at least one non-underscore char. - camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase()); - - // When we find an underscore, we remove it and capitalize the next char. To do this, - // we need to ensure the next char is not another underscore. - let mut chars = chars.coalesce(|c1, c2| { - if c1 == '_' && c2 == '_' { - Ok(c1) - } else { - Err((c1, c2)) - } - }); - + let mut last_char_was_underscore = true; while let Some(c) = chars.next() { - if c != '_' { - camel_ty.push(c); - } else if let Some(c) = chars.next() { - camel_ty.extend(c.to_uppercase()); + match c { + '_' => last_char_was_underscore = true, + c if last_char_was_underscore => { + camel_ty.extend(c.to_uppercase()); + last_char_was_underscore = false; + } + c => camel_ty.extend(c.to_lowercase()), } } camel_ty } + +#[test] +fn snake_to_camel_basic() { + assert_eq!(snake_to_camel("abc_def"), "AbcDef"); +} + +#[test] +fn snake_to_camel_underscore_suffix() { + assert_eq!(snake_to_camel("abc_def_"), "AbcDef"); +} + +#[test] +fn snake_to_camel_underscore_prefix() { + assert_eq!(snake_to_camel("_abc_def"), "AbcDef"); +} + +#[test] +fn snake_to_camel_underscore_consecutive() { + assert_eq!(snake_to_camel("abc__def"), "AbcDef"); +} + +#[test] +fn snake_to_camel_capital_in_middle() { + assert_eq!(snake_to_camel("aBc_dEf"), "AbcDef"); +} diff --git a/plugins/tests/service.rs b/plugins/tests/service.rs index fde4e10..e715894 100644 --- a/plugins/tests/service.rs +++ b/plugins/tests/service.rs @@ -6,7 +6,7 @@ use tarpc::context; fn att_service_trait() { use futures::future::{ready, Ready}; - #[tarpc_plugins::service] + #[tarpc::service] trait Foo { async fn two_part(s: String, i: i32) -> (String, i32); async fn bar(s: String) -> String; @@ -33,7 +33,7 @@ fn att_service_trait() { #[test] fn syntax() { - #[tarpc_plugins::service] + #[tarpc::service] trait Syntax { #[deny(warnings)] #[allow(non_snake_case)] From 2f24842b2d1b74b2914d0ce0a184d44b7032e4bb Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 23 Jul 2019 01:47:14 -0700 Subject: [PATCH 2/5] Add service name to generated items. With this change, the service definitions don't need to be isolated in their own modules. --- README.md | 41 ++++++------ example-service/src/client.rs | 11 ++-- example-service/src/lib.rs | 2 +- example-service/src/server.rs | 10 +-- plugins/src/lib.rs | 84 ++++++++++++++++++------- tarpc/Cargo.toml | 2 +- tarpc/examples/pubsub.rs | 31 ++++----- tarpc/examples/readme.rs | 30 +++++---- tarpc/examples/server_calling_server.rs | 23 +++---- tarpc/tests/service_functional.rs | 57 ++++++++++------- 10 files changed, 167 insertions(+), 124 deletions(-) diff --git a/README.md b/README.md index dc52e99..d639f63 100644 --- a/README.md +++ b/README.md @@ -36,12 +36,9 @@ Add to your `Cargo.toml` dependencies: tarpc = "0.18.0" ``` -The `service!` macro expands to a collection of items that form an rpc service. -In the above example, the macro is called within the `hello_service` module. -This module will contain a `Client` stub and `Service` trait. These generated -types make it easy and ergonomic to write servers without dealing with -serialization directly. Simply implement one of the generated traits, and you're -off to the races! +The `tarpc::service` attribute expands to a collection of items that form an rpc service. +These generated types make it easy and ergonomic to write servers with less boilerplate. +Simply implement the generated service trait, and you're off to the races! ## Example @@ -77,13 +74,13 @@ use std::io; // This is the service definition. It looks a lot like a trait definition. // It defines one RPC, hello, which takes one arg, name, and returns a String. #[tarpc::service] -trait Service { +trait World { /// Returns a greeting for name. async fn hello(name: String) -> String; } ``` -This service definition generates a trait called `Service`. Next we need to +This service definition generates a trait called `World`. Next we need to implement it for our Server struct. ```rust @@ -104,17 +101,17 @@ implement it for our Server struct. # // This is the service definition. It looks a lot like a trait definition. # // It defines one RPC, hello, which takes one arg, name, and returns a String. # #[tarpc::service] -# trait Service { +# trait World { # /// Returns a greeting for name. # async fn hello(name: String) -> String; # } # -// This is the type that implements the generated Service trait. It is the business logic +// This is the type that implements the generated World trait. It is the business logic // and is used to start the server. #[derive(Clone)] struct HelloServer; -impl Service for HelloServer { +impl World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -150,17 +147,17 @@ that uses bincode over TCP. # // This is the service definition. It looks a lot like a trait definition. # // It defines one RPC, hello, which takes one arg, name, and returns a String. # #[tarpc::service] -# trait Service { +# trait World { # /// Returns a greeting for name. # async fn hello(name: String) -> String; # } # -# // This is the type that implements the generated Service trait. It is the business logic +# // This is the type that implements the generated World trait. It is the business logic # // and is used to start the server. # #[derive(Clone)] # struct HelloServer; # -# impl Service for HelloServer { +# impl World for HelloServer { # // Each defined rpc generates two items in the trait, a fn that serves the RPC, and # // an associated type representing the future output by the fn. # @@ -179,19 +176,19 @@ async fn main() -> io::Result<()> { // incoming() takes a stream of transports such as would be returned by // TcpListener::incoming (but a stream instead of an iterator). .incoming(stream::once(future::ready(server_transport))) - // serve is generated by the service! macro. It takes as input any type implementing - // the generated Service trait. - .respond_with(serve(HelloServer)); + // serve_world is generated by the macro. It takes as input any type implementing + // the generated World trait. + .respond_with(serve_world(HelloServer)); let _ = runtime::spawn(server); - // new_stub is generated by the service! macro. Like Server, it takes a config and any + // world_stub is generated by the macro. Like Server, it takes a config and any // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = new_stub(client::Config::default(), client_transport).await?; + // by the service attribute. + let mut client = world_stub(client::Config::default(), client_transport).await?; - // The client has an RPC method for each RPC defined in service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context + // The client has an RPC method for each RPC defined in the annotated trait. It takes the same + // args as defined, with the addition of a Context, which is always the first arg. The Context // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), "Stim".to_string()).await?; diff --git a/example-service/src/client.rs b/example-service/src/client.rs index 1382fcc..1d60fc9 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -44,13 +44,12 @@ async fn main() -> io::Result<()> { let transport = json_transport::connect(&server_addr).await?; - // new_stub is generated by the service! macro. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = service::new_stub(client::Config::default(), transport).await?; + // world_stub is generated by the service attribute. Like Server, it takes a config and any + // Transport as input, and returns a Client, also generated by the attribute. + let mut client = service::world_stub(client::Config::default(), transport).await?; - // The client has an RPC method for each RPC defined in service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context + // The client has an RPC method for each RPC defined in the annotated trait. It takes the same + // args as defined, with the addition of a Context, which is always the first arg. The Context // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), name).await?; diff --git a/example-service/src/lib.rs b/example-service/src/lib.rs index 1bda58a..ee9f48f 100644 --- a/example-service/src/lib.rs +++ b/example-service/src/lib.rs @@ -9,7 +9,7 @@ /// This is the service definition. It looks a lot like a trait definition. /// It defines one RPC, hello, which takes one arg, name, and returns a String. #[tarpc::service] -pub trait Service { +pub trait World { /// Returns a greeting for name. async fn hello(name: String) -> String; } diff --git a/example-service/src/server.rs b/example-service/src/server.rs index 5a33824..542505c 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -17,12 +17,12 @@ use tarpc::{ server::{self, Channel, Handler}, }; -// This is the type that implements the generated Service trait. It is the business logic +// This is the type that implements the generated World trait. It is the business logic // and is used to start the server. #[derive(Clone)] struct HelloServer(SocketAddr); -impl service::Service for HelloServer { +impl service::World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -70,11 +70,11 @@ async fn main() -> io::Result<()> { .map(server::BaseChannel::with_defaults) // Limit channels to 1 per IP. .max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip()) - // serve is generated by the service! macro. It takes as input any type implementing - // the generated Service trait. + // serve is generated by the service attribute. It takes as input any type implementing + // the generated World trait. .map(|channel| { let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap()); - channel.respond_with(service::serve(server)) + channel.respond_with(service::serve_world(server)) }) // Max 10 channels. .buffer_unordered(10) diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 60bcd46..0ec0d9e 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -196,6 +196,19 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { let service_name_repeated2 = service_name_repeated.clone(); let client_ident = Ident::new(&format!("{}Client", ident), ident.span()); + let request_ident = Ident::new(&format!("{}Request", ident), ident.span()); + let request_ident_repeated = std::iter::repeat(request_ident.clone()); + let request_ident_repeated2 = request_ident_repeated.clone(); + let response_ident = Ident::new(&format!("{}Response", ident), ident.span()); + let response_ident_repeated = std::iter::repeat(response_ident.clone()); + let response_ident_repeated2 = response_ident_repeated.clone(); + let response_fut_name = format!("{}ResponseFut", ident); + let response_fut_ident = Ident::new(&response_fut_name, ident.span()); + let response_fut_ident_repeated = std::iter::repeat(response_fut_ident.clone()); + let response_fut_ident_repeated2 = response_fut_ident_repeated.clone(); + let snake_ident = camel_to_snake(&ident.to_string()); + let serve_ident = Ident::new(&format!("serve_{}", snake_ident), ident.span()); + let stub_ident = Ident::new(&format!("{}_stub", snake_ident), ident.span()); #[cfg(feature = "serde1")] let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]); @@ -211,41 +224,41 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { /// The request sent over the wire from the client to the server. #[derive(Debug)] #derive_serialize - #vis enum Request { + #vis enum #request_ident { #( #camel_case_idents{ #args } ),* } /// The response sent over the wire from the server to the client. #[derive(Debug)] #derive_serialize - #vis enum Response { + #vis enum #response_ident { #( #camel_case_idents(#outputs) ),* } - /// A future resolving to a server [`Response`]. - #vis enum ResponseFut { + /// A future resolving to a server response. + #vis enum #response_fut_ident { #( #camel_case_idents(::#future_types) ),* } - impl std::fmt::Debug for ResponseFut { + impl std::fmt::Debug for #response_fut_ident { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - fmt.debug_struct("ResponseFut").finish() + fmt.debug_struct(#response_fut_name).finish() } } - impl std::future::Future for ResponseFut { - type Output = std::io::Result; + impl std::future::Future for #response_fut_ident { + type Output = std::io::Result<#response_ident>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) - -> std::task::Poll> + -> std::task::Poll> { unsafe { match std::pin::Pin::get_unchecked_mut(self) { #( - ResponseFut::#camel_case_idents(resp) => + #response_fut_ident_repeated::#camel_case_idents(resp) => std::pin::Pin::new_unchecked(resp) .poll(cx) - .map(Response::#camel_case_idents2) + .map(#response_ident_repeated::#camel_case_idents2) .map(Ok), )* } @@ -254,13 +267,13 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { } /// Returns a serving function to use with tarpc::server::Server. - #vis fn serve(service: S) - -> impl FnOnce(tarpc::context::Context, Request) -> ResponseFut + Send + 'static + Clone { + #vis fn #serve_ident(service: S) + -> impl FnOnce(tarpc::context::Context, #request_ident) -> #response_fut_ident + Send + 'static + Clone { move |ctx, req| { match req { #( - Request::#camel_case_idents{ #arg_vars } => { - ResponseFut::#camel_case_idents2( + #request_ident_repeated::#camel_case_idents{ #arg_vars } => { + #response_fut_ident_repeated2::#camel_case_idents2( #service_name_repeated2::#method_names( service.clone(), ctx, #arg_vars2)) } @@ -272,19 +285,19 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. - #vis struct #client_ident>(C); + #vis struct #client_ident>(C); /// Returns a new client stub that sends requests over the given transport. - #vis async fn new_stub(config: tarpc::client::Config, transport: T) + #vis async fn #stub_ident(config: tarpc::client::Config, transport: T) -> std::io::Result<#client_ident> where - T: tarpc::Transport, tarpc::Response> + Send + 'static, + T: tarpc::Transport, tarpc::Response<#response_ident>> + Send + 'static, { Ok(#client_ident(tarpc::client::new(config, transport).await?)) } impl From for #client_ident - where for <'a> C: tarpc::Client<'a, Request, Response = Response> + where for <'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident> { fn from(client: C) -> Self { #client_ident(client) @@ -292,18 +305,18 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { } impl #client_ident - where for<'a> C: tarpc::Client<'a, Request, Response = Response> + where for<'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident> { #( #[allow(unused)] #( #method_attrs )* pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args) -> impl std::future::Future> + '_ { - let request = Request::#camel_case_idents { #arg_vars }; + let request = #request_ident_repeated2::#camel_case_idents { #arg_vars }; let resp = tarpc::Client::call(&mut self.0, ctx, request); async move { match resp.await? { - Response::#camel_case_idents2(msg) => std::result::Result::Ok(msg), + #response_ident_repeated2::#camel_case_idents2(msg) => std::result::Result::Ok(msg), _ => unreachable!(), } } @@ -334,6 +347,28 @@ fn snake_to_camel(ident_str: &str) -> String { camel_ty } +// Really basic camel to snake that assumes capitals are always the start of a new segment. +fn camel_to_snake(ident_str: &str) -> String { + let mut snake = String::new(); + let mut chars = ident_str.chars(); + if let Some(c) = chars.next() { + snake.extend(c.to_lowercase()); + } + + while let Some(c) = chars.next() { + if c.is_uppercase() { + // New word + snake.push('_'); + snake.extend(c.to_lowercase()); + } else { + // Same word + snake.push(c) + } + } + + snake +} + #[test] fn snake_to_camel_basic() { assert_eq!(snake_to_camel("abc_def"), "AbcDef"); @@ -358,3 +393,8 @@ fn snake_to_camel_underscore_consecutive() { fn snake_to_camel_capital_in_middle() { assert_eq!(snake_to_camel("aBc_dEf"), "AbcDef"); } + +#[test] +fn camel_to_snake_basic() { + assert_eq!(camel_to_snake("AbcDef"), "abc_def"); +} diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 6a4b11a..7e9efe3 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -28,9 +28,9 @@ assert_matches = "1.0" bincode = "1.0" bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } bytes = { version = "0.4", features = ["serde"] } +env_logger = "0.6" futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } humantime = "1.0" -env_logger = "0.6" runtime = "0.3.0-alpha.6" runtime-tokio = "0.3.0-alpha.5" tokio-tcp = "0.1" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 8c57b9b..fdfbeba 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -7,7 +7,6 @@ #![feature(async_await, existential_type)] use futures::{ - compat::Executor01CompatExt, future::{self, Ready}, prelude::*, Future, @@ -26,20 +25,17 @@ use std::{ }; pub mod subscriber { - pub use ServiceClient as Client; - #[tarpc::service] - pub trait Service { + pub trait Subscriber { async fn receive(message: String); } } pub mod publisher { use std::net::SocketAddr; - pub use ServiceClient as Client; #[tarpc::service] - pub trait Service { + pub trait Publisher { async fn broadcast(message: String); async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>; async fn unsubscribe(id: u32); @@ -51,7 +47,7 @@ struct Subscriber { id: u32, } -impl subscriber::Service for Subscriber { +impl subscriber::Subscriber for Subscriber { type ReceiveFut = Ready<()>; fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut { @@ -69,7 +65,7 @@ impl Subscriber { server::new(config) .incoming(incoming) .take(1) - .respond_with(subscriber::serve(Subscriber { id })), + .respond_with(subscriber::serve_subscriber(Subscriber { id })), ); Ok(addr) } @@ -77,7 +73,7 @@ impl Subscriber { #[derive(Clone, Debug)] struct Publisher { - clients: Arc>>, + clients: Arc>>, } impl Publisher { @@ -88,11 +84,14 @@ impl Publisher { } } -impl publisher::Service for Publisher { +impl publisher::Publisher for Publisher { existential type BroadcastFut: Future; fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut { - async fn broadcast(clients: Arc>>, message: String) { + async fn broadcast( + clients: Arc>>, + message: String, + ) { let mut clients = clients.lock().unwrap().clone(); for client in clients.values_mut() { // Ignore failing subscribers. In a real pubsub, @@ -109,12 +108,13 @@ impl publisher::Service for Publisher { fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut { async fn subscribe( - clients: Arc>>, + clients: Arc>>, id: u32, addr: SocketAddr, ) -> io::Result<()> { let conn = bincode_transport::connect(&addr).await?; - let subscriber = subscriber::new_stub(client::Config::default(), conn).await?; + let subscriber = + subscriber::subscriber_stub(client::Config::default(), conn).await?; eprintln!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); Ok(()) @@ -149,7 +149,7 @@ async fn main() -> io::Result<()> { transport .take(1) .map(server::BaseChannel::with_defaults) - .respond_with(publisher::serve(Publisher::new())), + .respond_with(publisher::serve_publisher(Publisher::new())), ); let subscriber1 = Subscriber::listen(0, server::Config::default()).await?; @@ -157,7 +157,8 @@ async fn main() -> io::Result<()> { let publisher_conn = bincode_transport::connect(&publisher_addr); let publisher_conn = publisher_conn.await?; - let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?; + let mut publisher = + publisher::publisher_stub(client::Config::default(), publisher_conn).await?; if let Err(e) = publisher .subscribe(context::current(), 0, subscriber1) diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 2aa2204..1f5d17b 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -16,20 +16,19 @@ use rpc::{ }; use std::io; -// This is the service definition. It looks a lot like a trait definition. -// It defines one RPC, hello, which takes one arg, name, and returns a String. - +/// This is the service definition. It looks a lot like a trait definition. +/// It defines one RPC, hello, which takes one arg, name, and returns a String. #[tarpc::service] -pub trait Service { +pub trait World { async fn hello(name: String) -> String; } -// This is the type that implements the generated Service trait. It is the business logic -// and is used to start the server. +/// This is the type that implements the generated World trait. It is the business logic +/// and is used to start the server. #[derive(Clone)] struct HelloServer; -impl Service for HelloServer { +impl World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -56,22 +55,21 @@ async fn main() -> io::Result<()> { // BaseChannel is the most basic channel, simply wrapping a transport with no added // functionality. BaseChannel::with_defaults(client) - // serve is generated by the tarpc::service! macro. It takes as input any type implementing - // the generated Service trait. - .respond_with(serve(HelloServer)) + // serve_world is generated by the tarpc::service attribute. It takes as input any type + // implementing the generated World trait. + .respond_with(serve_world(HelloServer)) .await; }; let _ = runtime::spawn(server); let transport = bincode_transport::connect(&addr).await?; - // new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the macro. - // by the service mcro. - let mut client = new_stub(client::Config::default(), transport).await?; + // world_stub is generated by the tarpc::service attribute. Like Server, it takes a config and + // any Transport as input, and returns a Client, also generated by the attribute. + let mut client = world_stub(client::Config::default(), transport).await?; - // The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args - // as defined, with the addition of a Context, which is always the first arg. The Context + // The client has an RPC method for each RPC defined in the annotated trait. It takes the same + // args as defined, with the addition of a Context, which is always the first arg. The Context // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), "Stim".to_string()).await?; diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 90b0b96..09cc166 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -6,7 +6,7 @@ #![feature(existential_type, async_await)] -use crate::{add::Service as AddService, double::Service as DoubleService}; +use crate::{add::Add as AddService, double::Double as DoubleService}; use futures::{ future::{self, Ready}, prelude::*, @@ -18,20 +18,16 @@ use rpc::{ use std::io; pub mod add { - pub use ServiceClient as Client; - #[tarpc::service] - pub trait Service { + pub trait Add { /// Add two ints together. async fn add(x: i32, y: i32) -> i32; } } pub mod double { - pub use ServiceClient as Client; - #[tarpc::service] - pub trait Service { + pub trait Double { /// 2 * x async fn double(x: i32) -> Result; } @@ -50,14 +46,14 @@ impl AddService for AddServer { #[derive(Clone)] struct DoubleServer { - add_client: add::Client, + add_client: add::AddClient, } impl DoubleService for DoubleServer { existential type DoubleFut: Future> + Send; fn double(self, _: context::Context, x: i32) -> Self::DoubleFut { - async fn double(mut client: add::Client, x: i32) -> Result { + async fn double(mut client: add::AddClient, x: i32) -> Result { client .add(context::current(), x, x) .await @@ -78,11 +74,11 @@ async fn main() -> io::Result<()> { let add_server = Server::default() .incoming(add_listener) .take(1) - .respond_with(add::serve(AddServer)); + .respond_with(add::serve_add(AddServer)); let _ = runtime::spawn(add_server); let to_add_server = bincode_transport::connect(&addr).await?; - let add_client = add::new_stub(client::Config::default(), to_add_server).await?; + let add_client = add::add_stub(client::Config::default(), to_add_server).await?; let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); @@ -90,11 +86,12 @@ async fn main() -> io::Result<()> { let double_server = rpc::Server::default() .incoming(double_listener) .take(1) - .respond_with(double::serve(DoubleServer { add_client })); + .respond_with(double::serve_double(DoubleServer { add_client })); let _ = runtime::spawn(double_server); let to_double_server = bincode_transport::connect(&addr).await?; - let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?; + let mut double_client = + double::double_stub(client::Config::default(), to_double_server).await?; for i in 1..=5 { eprintln!("{:?}", double_client.double(context::current(), i).await?); diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index 4ac5ef7..b9913d3 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -1,13 +1,20 @@ #![feature(async_await)] +#[cfg(not(feature = "serde1"))] +use std::rc::Rc; + use assert_matches::assert_matches; use futures::{ future::{ready, Ready}, prelude::*, }; -#[cfg(feature = "serde1")] + use std::io; -use tarpc::{client, context, server::Handler, transport::channel}; +use tarpc::{ + client, context, + server::{self, BaseChannel, Channel, Handler}, + transport::channel, +}; #[tarpc_plugins::service] trait Service { @@ -33,25 +40,24 @@ impl Service for Server { } #[runtime::test(runtime_tokio::TokioCurrentThread)] -async fn sequential() { +async fn sequential() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); + let _ = runtime::spawn( - tarpc::Server::default() - .incoming(stream::once(ready(rx))) - .respond_with(serve(Server)), + BaseChannel::new(server::Config::default(), rx) + .respond_with(serve_service(Server)) ); - let mut client = new_stub(client::Config::default(), tx).await.unwrap(); - assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap()); - assert_eq!( - "Hey, Tim.", - client - .hey(context::current(), "Tim".to_string()) - .await - .unwrap() - ); + let mut client = service_stub(client::Config::default(), tx).await?; + + assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); + assert_matches!( + client.hey(context::current(), "Tim".into()).await, + Ok(ref s) if s == "Hey, Tim."); + + Ok(()) } #[cfg(feature = "serde1")] @@ -64,11 +70,11 @@ async fn serde() -> io::Result<()> { let _ = runtime::spawn( tarpc::Server::default() .incoming(transport.take(1).filter_map(|r| async { r.ok() })) - .respond_with(serve(Server)), + .respond_with(serve_service(Server)), ); let transport = bincode_transport::connect(&addr).await?; - let mut client = new_stub(client::Config::default(), transport).await?; + let mut client = service_stub(client::Config::default(), transport).await?; assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); assert_matches!( @@ -80,25 +86,30 @@ async fn serde() -> io::Result<()> { } #[runtime::test(runtime_tokio::TokioCurrentThread)] -async fn concurrent() { +async fn concurrent() -> io::Result<()> { let _ = env_logger::try_init(); let (tx, rx) = channel::unbounded(); let _ = runtime::spawn( rpc::Server::default() .incoming(stream::once(ready(rx))) - .respond_with(serve(Server)), + .respond_with(serve_service(Server)), ); - let client = new_stub(client::Config::default(), tx).await.unwrap(); + let client = service_stub(client::Config::default(), tx).await?; + let mut c = client.clone(); let req1 = c.add(context::current(), 1, 2); + let mut c = client.clone(); let req2 = c.add(context::current(), 3, 4); + let mut c = client.clone(); let req3 = c.hey(context::current(), "Tim".to_string()); - assert_eq!(3, req1.await.unwrap()); - assert_eq!(7, req2.await.unwrap()); - assert_eq!("Hey, Tim.", req3.await.unwrap()); + assert_matches!(req1.await, Ok(3)); + assert_matches!(req2.await, Ok(7)); + assert_matches!(req3.await, Ok(ref s) if s == "Hey, Tim."); + + Ok(()) } From 1b58914d59244c5451ea68fc765a647753599a20 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 30 Jul 2019 20:13:32 -0700 Subject: [PATCH 3/5] Move generated functions under their corresponding items. - fn serve -> Service::serve - fn new_stub -> Client::new This allows the generated function names to remain consistent across service definitions while preventing collisions. --- README.md | 11 +-- example-service/src/client.rs | 6 +- example-service/src/server.rs | 5 +- plugins/src/lib.rs | 107 +++++++++++------------- rpc/src/client/channel.rs | 12 ++- rpc/src/server/mod.rs | 103 +++++++++++++---------- rpc/src/transport/channel.rs | 8 +- rpc/src/util/deadline_compat.rs | 9 +- tarpc/examples/pubsub.rs | 10 ++- tarpc/examples/readme.rs | 8 +- tarpc/examples/server_calling_server.rs | 8 +- tarpc/tests/service_functional.rs | 12 +-- 12 files changed, 155 insertions(+), 144 deletions(-) diff --git a/README.md b/README.md index d639f63..e098a00 100644 --- a/README.md +++ b/README.md @@ -176,16 +176,13 @@ async fn main() -> io::Result<()> { // incoming() takes a stream of transports such as would be returned by // TcpListener::incoming (but a stream instead of an iterator). .incoming(stream::once(future::ready(server_transport))) - // serve_world is generated by the macro. It takes as input any type implementing - // the generated World trait. - .respond_with(serve_world(HelloServer)); + .respond_with(HelloServer.serve()); let _ = runtime::spawn(server); - // world_stub is generated by the macro. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the macro. - // by the service attribute. - let mut client = world_stub(client::Config::default(), client_transport).await?; + // WorldClient is generated by the macro. It has a constructor `new` that takes a config and + // any Transport as input + let mut client = WorldClient::new(client::Config::default(), client_transport).await?; // The client has an RPC method for each RPC defined in the annotated trait. It takes the same // args as defined, with the addition of a Context, which is always the first arg. The Context diff --git a/example-service/src/client.rs b/example-service/src/client.rs index 1d60fc9..5074b0e 100644 --- a/example-service/src/client.rs +++ b/example-service/src/client.rs @@ -44,9 +44,9 @@ async fn main() -> io::Result<()> { let transport = json_transport::connect(&server_addr).await?; - // world_stub is generated by the service attribute. Like Server, it takes a config and any - // Transport as input, and returns a Client, also generated by the attribute. - let mut client = service::world_stub(client::Config::default(), transport).await?; + // WorldClient is generated by the service attribute. It has a constructor `new` that takes a + // config and any Transport as input. + let mut client = service::WorldClient::new(client::Config::default(), transport).await?; // The client has an RPC method for each RPC defined in the annotated trait. It takes the same // args as defined, with the addition of a Context, which is always the first arg. The Context diff --git a/example-service/src/server.rs b/example-service/src/server.rs index 542505c..b60d7d2 100644 --- a/example-service/src/server.rs +++ b/example-service/src/server.rs @@ -11,6 +11,7 @@ use futures::{ future::{self, Ready}, prelude::*, }; +use service::World; use std::{io, net::SocketAddr}; use tarpc::{ context, @@ -22,7 +23,7 @@ use tarpc::{ #[derive(Clone)] struct HelloServer(SocketAddr); -impl service::World for HelloServer { +impl World for HelloServer { // Each defined rpc generates two items in the trait, a fn that serves the RPC, and // an associated type representing the future output by the fn. @@ -74,7 +75,7 @@ async fn main() -> io::Result<()> { // the generated World trait. .map(|channel| { let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap()); - channel.respond_with(service::serve_world(server)) + channel.respond_with(server.serve()) }) // Max 10 channels. .buffer_unordered(10) diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 0ec0d9e..96db027 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -135,6 +135,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { ident, rpcs, } = parse_macro_input!(input as Service); + let vis_repeated = std::iter::repeat(vis.clone()); let camel_case_fn_names: Vec = rpcs .iter() @@ -206,9 +207,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { let response_fut_ident = Ident::new(&response_fut_name, ident.span()); let response_fut_ident_repeated = std::iter::repeat(response_fut_ident.clone()); let response_fut_ident_repeated2 = response_fut_ident_repeated.clone(); - let snake_ident = camel_to_snake(&ident.to_string()); - let serve_ident = Ident::new(&format!("serve_{}", snake_ident), ident.span()); - let stub_ident = Ident::new(&format!("{}_stub", snake_ident), ident.span()); + let server_ident = Ident::new(&format!("{}Server", ident), ident.span()); #[cfg(feature = "serde1")] let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]); @@ -219,6 +218,35 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { #( #attrs )* #vis trait #ident: Clone + Send + 'static { #( #types_and_fns )* + + /// Returns a serving function to use with tarpc::server::Server. + fn serve(self) -> #server_ident { + #server_ident { service: self } + } + } + + #[derive(Clone)] + #vis struct #server_ident { + service: S, + } + + impl tarpc::server::Serve<#request_ident> for #server_ident + where S: #ident + { + type Resp = #response_ident; + type Fut = #response_fut_ident; + + fn serve(self, ctx: tarpc::context::Context, req: #request_ident) -> Self::Fut { + match req { + #( + #request_ident_repeated::#camel_case_idents{ #arg_vars } => { + #response_fut_ident_repeated2::#camel_case_idents2( + #service_name_repeated2::#method_names( + self.service, ctx, #arg_vars2)) + } + )* + } + } } /// The request sent over the wire from the client to the server. @@ -247,10 +275,10 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { } impl std::future::Future for #response_fut_ident { - type Output = std::io::Result<#response_ident>; + type Output = #response_ident; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) - -> std::task::Poll> + -> std::task::Poll<#response_ident> { unsafe { match std::pin::Pin::get_unchecked_mut(self) { @@ -258,44 +286,18 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { #response_fut_ident_repeated::#camel_case_idents(resp) => std::pin::Pin::new_unchecked(resp) .poll(cx) - .map(#response_ident_repeated::#camel_case_idents2) - .map(Ok), + .map(#response_ident_repeated::#camel_case_idents2), )* } } } } - /// Returns a serving function to use with tarpc::server::Server. - #vis fn #serve_ident(service: S) - -> impl FnOnce(tarpc::context::Context, #request_ident) -> #response_fut_ident + Send + 'static + Clone { - move |ctx, req| { - match req { - #( - #request_ident_repeated::#camel_case_idents{ #arg_vars } => { - #response_fut_ident_repeated2::#camel_case_idents2( - #service_name_repeated2::#method_names( - service.clone(), ctx, #arg_vars2)) - } - )* - } - } - } - #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. #vis struct #client_ident>(C); - /// Returns a new client stub that sends requests over the given transport. - #vis async fn #stub_ident(config: tarpc::client::Config, transport: T) - -> std::io::Result<#client_ident> - where - T: tarpc::Transport, tarpc::Response<#response_ident>> + Send + 'static, - { - Ok(#client_ident(tarpc::client::new(config, transport).await?)) - } - impl From for #client_ident where for <'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident> { @@ -304,13 +306,25 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { } } + impl #client_ident { + /// Returns a new client stub that sends requests over the given transport. + #vis async fn new(config: tarpc::client::Config, transport: T) + -> std::io::Result + where + T: tarpc::Transport, tarpc::Response<#response_ident>> + Send + 'static + { + Ok(#client_ident(tarpc::client::new(config, transport).await?)) + } + + } + impl #client_ident where for<'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident> { #( #[allow(unused)] #( #method_attrs )* - pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args) + #vis_repeated fn #method_names(&mut self, ctx: tarpc::context::Context, #args) -> impl std::future::Future> + '_ { let request = #request_ident_repeated2::#camel_case_idents { #arg_vars }; let resp = tarpc::Client::call(&mut self.0, ctx, request); @@ -347,28 +361,6 @@ fn snake_to_camel(ident_str: &str) -> String { camel_ty } -// Really basic camel to snake that assumes capitals are always the start of a new segment. -fn camel_to_snake(ident_str: &str) -> String { - let mut snake = String::new(); - let mut chars = ident_str.chars(); - if let Some(c) = chars.next() { - snake.extend(c.to_lowercase()); - } - - while let Some(c) = chars.next() { - if c.is_uppercase() { - // New word - snake.push('_'); - snake.extend(c.to_lowercase()); - } else { - // Same word - snake.push(c) - } - } - - snake -} - #[test] fn snake_to_camel_basic() { assert_eq!(snake_to_camel("abc_def"), "AbcDef"); @@ -393,8 +385,3 @@ fn snake_to_camel_underscore_consecutive() { fn snake_to_camel_capital_in_middle() { assert_eq!(snake_to_camel("aBc_dEf"), "AbcDef"); } - -#[test] -fn camel_to_snake_basic() { - assert_eq!(camel_to_snake("AbcDef"), "abc_def"); -} diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 0eaa126..c6bb428 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -180,7 +180,15 @@ impl Future for DispatchResponse { Poll::Ready(match resp { Ok(resp) => { self.complete = true; - Ok(resp.message?) + match resp { + Ok(resp) => Ok(resp.message?), + Err(oneshot::Canceled) => { + // The oneshot is Canceled when the dispatch task ends. In that case, + // there's nothing listening on the other side, so there's no point in + // propagating cancellation. + Err(io::Error::from(io::ErrorKind::ConnectionReset)) + } + } } Err(e) => Err({ let trace_id = *self.as_mut().ctx().trace_id(); @@ -211,7 +219,7 @@ impl Future for DispatchResponse { self.complete = true; io::Error::from(io::ErrorKind::ConnectionReset) } else { - panic!("[{}] Unrecognized deadline error: {}", trace_id, e) + panic!("[{}] Unrecognized deadline error: {:?}", trace_id, e) } }), }) diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 6885c2e..60c6124 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -23,7 +23,6 @@ use humantime::format_rfc3339; use log::{debug, error, info, trace, warn}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ - error::Error as StdError, fmt, hash::Hash, io, @@ -113,29 +112,29 @@ impl Server { /// The future driving the server. #[derive(Debug)] -pub struct Running { - incoming: S, - request_handler: F, +pub struct Running { + incoming: St, + server: Se, } -impl Running { - unsafe_pinned!(incoming: S); - unsafe_unpinned!(request_handler: F); +impl Running { + unsafe_pinned!(incoming: St); + unsafe_unpinned!(server: Se); } -impl Future for Running +impl Future for Running where - S: Sized + Stream, + St: Sized + Stream, C: Channel + Send + 'static, - F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone, - Fut: Future> + Send + 'static, + Se: Serve + Send + 'static, + Se::Fut: Send + 'static { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) { if let Err(e) = - crate::spawn(channel.respond_with(self.as_mut().request_handler().clone())) + crate::spawn(channel.respond_with(self.as_mut().server().clone())) { warn!("Failed to spawn channel handler: {:?}", e); } @@ -145,6 +144,30 @@ where } } +/// Basically a Fn(Req) -> impl Future; +pub trait Serve: Sized + Clone { + /// Type of response. + type Resp; + + /// Type of response future. + type Fut: Future; + + /// Responds to a single request. + fn serve(self, ctx: context::Context, req: Req) -> Self::Fut; +} + +impl Serve for F +where F: FnOnce(context::Context, Req) -> Fut + Clone, + Fut: Future +{ + type Resp = Resp; + type Fut = Fut; + + fn serve(self, ctx: context::Context, req: Req) -> Self::Fut { + self(ctx, req) + } +} + /// A utility trait enabling a stream to fluently chain a request handler. pub trait Handler where @@ -165,15 +188,15 @@ where ThrottlerStream::new(self, n) } - /// Responds to all requests with `request_handler`. - fn respond_with(self, request_handler: F) -> Running + /// Responds to all requests with `server`. + fn respond_with(self, server: S) -> Running where - F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone, - Fut: Future> + Send + 'static, + S: Serve + Send + 'static, + S::Fut: Send + 'static, { Running { incoming: self, - request_handler, + server, } } } @@ -291,10 +314,10 @@ where /// Respond to requests coming over the channel with `f`. Returns a future that drives the /// responses and resolves when the connection is closed. - fn respond_with(self, f: F) -> ResponseHandler + fn respond_with(self, server: S) -> ResponseHandler where - F: FnOnce(context::Context, Self::Req) -> Fut + Send + 'static + Clone, - Fut: Future> + Send + 'static, + S: Serve + Send + 'static, + S::Fut: Send + 'static, Self: Sized, { let (responses_tx, responses) = mpsc::channel(self.config().pending_response_buffer); @@ -302,7 +325,7 @@ where ResponseHandler { channel: self, - f, + server, pending_responses: responses, responses_tx, } @@ -406,7 +429,7 @@ where /// A running handler serving all requests coming over a channel. #[derive(Debug)] -pub struct ResponseHandler +pub struct ResponseHandler where C: Channel, { @@ -416,10 +439,10 @@ where /// Handed out to request handlers to fan in responses. responses_tx: mpsc::Sender<(context::Context, Response)>, /// Request handler. - f: F, + server: S, } -impl ResponseHandler +impl ResponseHandler where C: Channel, { @@ -428,14 +451,14 @@ where unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response)>); // For this to be safe, field f must be private, and code in this module must never // construct PinMut. - unsafe_unpinned!(f: F); + unsafe_unpinned!(server: S); } -impl ResponseHandler +impl ResponseHandler where C: Channel, - F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone, - Fut: Future> + Send + 'static, + S: Serve + Send + 'static, + S::Fut: Send + 'static, { fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> { match ready!(self.as_mut().channel().poll_next(cx)?) { @@ -516,7 +539,7 @@ where let mut response_tx = self.as_mut().responses_tx().clone(); let trace_id = *ctx.trace_id(); - let response = self.as_mut().f().clone()(ctx, request); + let response = self.as_mut().server().clone().serve(ctx, request); let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then( move |result| { async move { @@ -550,11 +573,11 @@ where } } -impl Future for ResponseHandler +impl Future for ResponseHandler where C: Channel, - F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone, - Fut: Future> + Send + 'static, + S: Serve + Send + 'static, + S::Fut: Send + 'static, { type Output = (); @@ -581,7 +604,7 @@ where } fn make_server_error( - e: timeout::Error, + e: timeout::Error<()>, trace_id: TraceId, deadline: SystemTime, ) -> ServerError { @@ -601,26 +624,20 @@ fn make_server_error( } } else if e.is_timer() { error!( - "[{}] Response failed because of an issue with a timer: {}", + "[{}] Response failed because of an issue with a timer: {:?}", trace_id, e ); ServerError { kind: io::ErrorKind::Other, - detail: Some(format!("{}", e)), - } - } else if e.is_inner() { - let e = e.into_inner().unwrap(); - ServerError { - kind: e.kind(), - detail: Some(e.description().into()), + detail: Some(format!("{:?}", e)), } } else { - error!("[{}] Unexpected response failure: {}", trace_id, e); + error!("[{}] Unexpected response failure: {:?}", trace_id, e); ServerError { kind: io::ErrorKind::Other, - detail: Some(format!("Server unexpectedly failed to respond: {}", e)), + detail: Some(format!("Server unexpectedly failed to respond: {:?}", e)), } } } diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index e2786c3..309d12b 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -93,9 +93,9 @@ mod tests { let (client_channel, server_channel) = transport::channel::unbounded(); crate::spawn( - Server::::default() + Server::default() .incoming(stream::once(future::ready(server_channel))) - .respond_with(|_ctx, request| { + .respond_with(|_ctx, request: String| { future::ready(request.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, @@ -108,8 +108,8 @@ mod tests { let mut client = client::new(client::Config::default(), client_channel).await?; - let response1 = client.call(context::current(), "123".into()).await; - let response2 = client.call(context::current(), "abc".into()).await; + let response1 = client.call(context::current(), "123".into()).await?; + let response2 = client.call(context::current(), "abc".into()).await?; trace!("response1: {:?}, response2: {:?}", response1, response2); diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs index c91f20d..7898df8 100644 --- a/rpc/src/util/deadline_compat.rs +++ b/rpc/src/util/deadline_compat.rs @@ -46,16 +46,15 @@ impl Deadline { } impl Future for Deadline where - T: TryFuture, + T: Future, { - type Output = Result>; + type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // First, try polling the future - match self.as_mut().future().try_poll(cx) { - Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), + match self.as_mut().future().poll(cx) { + Poll::Ready(v) => return Poll::Ready(Ok(v)), Poll::Pending => {} - Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))), } let delay = self.delay().poll_unpin(cx); diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index fdfbeba..85dfb9d 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -11,6 +11,7 @@ use futures::{ prelude::*, Future, }; +use publisher::Publisher as _; use rpc::{ client, context, server::{self, Handler}, @@ -23,6 +24,7 @@ use std::{ thread, time::Duration, }; +use subscriber::Subscriber as _; pub mod subscriber { #[tarpc::service] @@ -65,7 +67,7 @@ impl Subscriber { server::new(config) .incoming(incoming) .take(1) - .respond_with(subscriber::serve_subscriber(Subscriber { id })), + .respond_with(Subscriber { id }.serve()), ); Ok(addr) } @@ -114,7 +116,7 @@ impl publisher::Publisher for Publisher { ) -> io::Result<()> { let conn = bincode_transport::connect(&addr).await?; let subscriber = - subscriber::subscriber_stub(client::Config::default(), conn).await?; + subscriber::SubscriberClient::new(client::Config::default(), conn).await?; eprintln!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); Ok(()) @@ -149,7 +151,7 @@ async fn main() -> io::Result<()> { transport .take(1) .map(server::BaseChannel::with_defaults) - .respond_with(publisher::serve_publisher(Publisher::new())), + .respond_with(Publisher::new().serve()), ); let subscriber1 = Subscriber::listen(0, server::Config::default()).await?; @@ -158,7 +160,7 @@ async fn main() -> io::Result<()> { let publisher_conn = bincode_transport::connect(&publisher_addr); let publisher_conn = publisher_conn.await?; let mut publisher = - publisher::publisher_stub(client::Config::default(), publisher_conn).await?; + publisher::PublisherClient::new(client::Config::default(), publisher_conn).await?; if let Err(e) = publisher .subscribe(context::current(), 0, subscriber1) diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 1f5d17b..5d44be5 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -57,16 +57,16 @@ async fn main() -> io::Result<()> { BaseChannel::with_defaults(client) // serve_world is generated by the tarpc::service attribute. It takes as input any type // implementing the generated World trait. - .respond_with(serve_world(HelloServer)) + .respond_with(HelloServer.serve()) .await; }; let _ = runtime::spawn(server); let transport = bincode_transport::connect(&addr).await?; - // world_stub is generated by the tarpc::service attribute. Like Server, it takes a config and - // any Transport as input, and returns a Client, also generated by the attribute. - let mut client = world_stub(client::Config::default(), transport).await?; + // WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that + // takes a config and any Transport as input. + let mut client = WorldClient::new(client::Config::default(), transport).await?; // The client has an RPC method for each RPC defined in the annotated trait. It takes the same // args as defined, with the addition of a Context, which is always the first arg. The Context diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 09cc166..025d36c 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -74,11 +74,11 @@ async fn main() -> io::Result<()> { let add_server = Server::default() .incoming(add_listener) .take(1) - .respond_with(add::serve_add(AddServer)); + .respond_with(AddServer.serve()); let _ = runtime::spawn(add_server); let to_add_server = bincode_transport::connect(&addr).await?; - let add_client = add::add_stub(client::Config::default(), to_add_server).await?; + let add_client = add::AddClient::new(client::Config::default(), to_add_server).await?; let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())? .filter_map(|r| future::ready(r.ok())); @@ -86,12 +86,12 @@ async fn main() -> io::Result<()> { let double_server = rpc::Server::default() .incoming(double_listener) .take(1) - .respond_with(double::serve_double(DoubleServer { add_client })); + .respond_with(DoubleServer { add_client }.serve()); let _ = runtime::spawn(double_server); let to_double_server = bincode_transport::connect(&addr).await?; let mut double_client = - double::double_stub(client::Config::default(), to_double_server).await?; + double::DoubleClient::new(client::Config::default(), to_double_server).await?; for i in 1..=5 { eprintln!("{:?}", double_client.double(context::current(), i).await?); diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index b9913d3..fa7f3b5 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -47,10 +47,10 @@ async fn sequential() -> io::Result<()> { let _ = runtime::spawn( BaseChannel::new(server::Config::default(), rx) - .respond_with(serve_service(Server)) + .respond_with(Server.serve()) ); - let mut client = service_stub(client::Config::default(), tx).await?; + let mut client = ServiceClient::new(client::Config::default(), tx).await?; assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); assert_matches!( @@ -70,11 +70,11 @@ async fn serde() -> io::Result<()> { let _ = runtime::spawn( tarpc::Server::default() .incoming(transport.take(1).filter_map(|r| async { r.ok() })) - .respond_with(serve_service(Server)), + .respond_with(Server.serve()), ); let transport = bincode_transport::connect(&addr).await?; - let mut client = service_stub(client::Config::default(), transport).await?; + let mut client = ServiceClient::new(client::Config::default(), transport).await?; assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); assert_matches!( @@ -93,10 +93,10 @@ async fn concurrent() -> io::Result<()> { let _ = runtime::spawn( rpc::Server::default() .incoming(stream::once(ready(rx))) - .respond_with(serve_service(Server)), + .respond_with(Server.serve()), ); - let client = service_stub(client::Config::default(), tx).await?; + let client = ServiceClient::new(client::Config::default(), tx).await?; let mut c = client.clone(); let req1 = c.add(context::current(), 1, 2); From e48e6dfe67ffff75d8e89723ebbe8c443a1e78e8 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 30 Jul 2019 21:31:22 -0700 Subject: [PATCH 4/5] Add nice error message for ident collisions --- plugins/src/lib.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 96db027..5ea9180 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -46,10 +46,24 @@ impl Parse for Service { let ident = input.parse()?; let content; braced!(content in input); - let mut rpcs = Vec::new(); + let mut rpcs = Vec::::new(); while !content.is_empty() { rpcs.push(content.parse()?); } + for rpc in &rpcs { + if rpc.ident == "new" { + return Err(syn::Error::new( + rpc.ident.span(), + format!("method name conflicts with generated fn `{}Client::new`", ident) + )) + } + if rpc.ident == "serve" { + return Err(syn::Error::new( + rpc.ident.span(), + format!("method name conflicts with generated fn `{}::serve`", ident) + )) + } + } Ok(Service { attrs, vis, @@ -83,19 +97,19 @@ impl Parse for RpcMethod { FnArg::SelfRef(self_ref) => { return Err(syn::Error::new( self_ref.span(), - "RPC args cannot start with self", + "method args cannot start with self", )) } FnArg::SelfValue(self_val) => { return Err(syn::Error::new( self_val.span(), - "RPC args cannot start with self", + "method args cannot start with self", )) } arg => { return Err(syn::Error::new( arg.span(), - "RPC args must be explicitly typed patterns", + "method args must be explicitly typed patterns", )) } }) From 22ef6b7800ba284c99e1590de117737a936e5896 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 30 Jul 2019 21:46:16 -0700 Subject: [PATCH 5/5] Choose a slightly less obvious name for Serve impl. To hopefully avoid most collisions. --- plugins/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 5ea9180..5cb9269 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -221,7 +221,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { let response_fut_ident = Ident::new(&response_fut_name, ident.span()); let response_fut_ident_repeated = std::iter::repeat(response_fut_ident.clone()); let response_fut_ident_repeated2 = response_fut_ident_repeated.clone(); - let server_ident = Ident::new(&format!("{}Server", ident), ident.span()); + let server_ident = Ident::new(&format!("Serve{}", ident), ident.span()); #[cfg(feature = "serde1")] let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]);