diff --git a/.travis.yml b/.travis.yml index 7f582f5..a4ded2f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,3 +11,4 @@ os: script: - cargo test --all-targets --all-features - cargo test --doc --all-features + - cargo run --example 2>&1 | grep ' ' | awk '{print $1}' | xargs -L 1 cargo run --all-features --example diff --git a/README.md b/README.md index 5d07d22..dc52e99 100644 --- a/README.md +++ b/README.md @@ -76,9 +76,10 @@ use std::io; // This is the service definition. It looks a lot like a trait definition. // It defines one RPC, hello, which takes one arg, name, and returns a String. -tarpc::service! { +#[tarpc::service] +trait Service { /// Returns a greeting for name. - rpc hello(name: String) -> String; + async fn hello(name: String) -> String; } ``` @@ -102,9 +103,10 @@ implement it for our Server struct. # # // This is the service definition. It looks a lot like a trait definition. # // It defines one RPC, hello, which takes one arg, name, and returns a String. -# tarpc::service! { +# #[tarpc::service] +# trait Service { # /// Returns a greeting for name. -# rpc hello(name: String) -> String; +# async fn hello(name: String) -> String; # } # // This is the type that implements the generated Service trait. It is the business logic @@ -147,9 +149,10 @@ that uses bincode over TCP. # # // This is the service definition. It looks a lot like a trait definition. # // It defines one RPC, hello, which takes one arg, name, and returns a String. -# tarpc::service! { +# #[tarpc::service] +# trait Service { # /// Returns a greeting for name. -# rpc hello(name: String) -> String; +# async fn hello(name: String) -> String; # } # # // This is the type that implements the generated Service trait. It is the business logic @@ -198,7 +201,6 @@ async fn main() -> io::Result<()> { } ``` - ## Service Documentation Use `cargo doc` as you normally would to see the documentation created for all diff --git a/example-service/src/lib.rs b/example-service/src/lib.rs index 01c43ca..1bda58a 100644 --- a/example-service/src/lib.rs +++ b/example-service/src/lib.rs @@ -4,15 +4,12 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(async_await, proc_macro_hygiene)] +#![feature(async_await)] -// This is the service definition. It looks a lot like a trait definition. -// It defines one RPC, hello, which takes one arg, name, and returns a String. -tarpc::service! { +/// This is the service definition. It looks a lot like a trait definition. +/// It defines one RPC, hello, which takes one arg, name, and returns a String. +#[tarpc::service] +pub trait Service { /// Returns a greeting for name. - rpc hello(#[serde(default = "default_name")] name: String) -> String; -} - -fn default_name() -> String { - "DefaultName".into() + async fn hello(name: String) -> String; } diff --git a/hooks/pre-push b/hooks/pre-push index 57d5e87..2f1bce1 100755 --- a/hooks/pre-push +++ b/hooks/pre-push @@ -91,7 +91,11 @@ if [ "$?" == 0 ]; then try_run "Building ... " cargo build --color=always try_run "Testing ... " cargo test --color=always - try_run "Doc Test ... " cargo clean && cargo build --tests && rustdoc --test README.md --edition 2018 -L target/debug/deps -Z unstable-options + try_run "Testing with all features enabled ... " cargo test --all-features --color=always + for EXAMPLE in $(cargo run --example 2>&1 | grep ' ' | awk '{print $1}') + do + try_run "Running example \"$EXAMPLE\" ... " cargo run --all-features --example $EXAMPLE + done fi diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index a7fd78c..0398608 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -2,6 +2,7 @@ name = "tarpc-plugins" version = "0.5.1" authors = ["Adam Wright ", "Tim Kuehn "] +edition = "2018" license = "MIT" documentation = "https://docs.rs/tarpc-plugins" homepage = "https://github.com/google/tarpc" @@ -11,6 +12,9 @@ categories = ["asynchronous", "network-programming"] readme = "../README.md" description = "Proc macros for tarpc." +[features] +serde1 = [] + [badges] travis-ci = { repository = "google/tarpc" } @@ -22,3 +26,17 @@ proc-macro2 = "0.4" [lib] proc-macro = true + +[dev-dependencies] +bincode = "1" +bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } +bytes = { version = "0.4", features = ["serde"] } +futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +humantime = "1.0" +env_logger = "0.6" +serde = { version = "1.0", features = ["derive"] } +tarpc = { path = "../tarpc" } +tokio = "0.1" +tokio-executor = "0.1" +tokio-tcp = "0.1" +pin-utils = "0.1.0-alpha.4" diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 1109c9a..4d5ef41 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -4,87 +4,286 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. +#![recursion_limit = "512"] + extern crate itertools; extern crate proc_macro; extern crate proc_macro2; extern crate quote; extern crate syn; +use proc_macro2::TokenStream as TokenStream2; use proc_macro::TokenStream; - use itertools::Itertools; -use proc_macro2::Span; -use quote::ToTokens; -use std::str::FromStr; -use syn::{parse, Ident, TraitItemType, TypePath}; +use quote::quote; +use syn::{parse_macro_input, parenthesized, braced, Attribute, Ident, FnArg, ArgCaptured, + ReturnType, Pat, Token, Visibility, + parse::{Parse, ParseStream}, + punctuated::Punctuated, + spanned::Spanned, + token::Comma}; -#[proc_macro] -pub fn snake_to_camel(input: TokenStream) -> TokenStream { - let i = input.clone(); - let mut assoc_type = parse::(input) - .unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i)); +struct Service { + attrs: Vec, + vis: Visibility, + ident: Ident, + rpcs: Vec, +} - let old_ident = convert(&mut assoc_type.ident); +struct RpcMethod { + attrs: Vec, + ident: Ident, + args: Punctuated, + output: ReturnType, +} - for mut attr in &mut assoc_type.attrs { - if let Some(pair) = attr.path.segments.first() { - if pair.value().ident == "doc" { - attr.tts = proc_macro2::TokenStream::from_str( - &attr.tts.to_string().replace("{}", &old_ident), - ) - .unwrap(); - } +impl Parse for Service { + fn parse(input: ParseStream) -> syn::Result { + let attrs = input.call(Attribute::parse_outer)?; + let vis = input.parse()?; + input.parse::()?; + let ident = input.parse()?; + let content; + braced!(content in input); + let mut rpcs = Vec::new(); + while !content.is_empty() { + rpcs.push(content.parse()?); + } + Ok(Service { attrs, vis, ident, rpcs }) + } +} + +impl Parse for RpcMethod { + fn parse(input: ParseStream) -> syn::Result { + let attrs = input.call(Attribute::parse_outer)?; + input.parse::()?; + input.parse::()?; + let ident = input.parse()?; + let content; + parenthesized!(content in input); + let args: Punctuated = content.parse_terminated(FnArg::parse)?; + let args = args.into_iter().map(|arg| match arg { + FnArg::Captured(captured) => match captured.pat { + Pat::Ident(_) => Ok(captured), + _ => return Err(syn::Error::new( + captured.pat.span(), "patterns aren't allowed in RPC args")) + }, + FnArg::SelfRef(self_ref) => return Err(syn::Error::new( + self_ref.span(), "RPC args cannot start with self")), + FnArg::SelfValue(self_val) => return Err(syn::Error::new( + self_val.span(), "RPC args cannot start with self")), + arg => return Err(syn::Error::new( + arg.span(), "RPC args must be explicitly typed patterns")), + }) + .collect::>()?; + let output = input.parse()?; + input.parse::()?; + + Ok(RpcMethod { attrs, ident, args, output, }) } } - assoc_type.into_token_stream().into() +#[proc_macro_attribute] +pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream { + struct EmptyArgs; + impl Parse for EmptyArgs { + fn parse(_: ParseStream) -> syn::Result { + Ok(EmptyArgs) + } + } + parse_macro_input!(attr as EmptyArgs); + + let Service { attrs, vis, ident, rpcs } = parse_macro_input!(input as Service); + + let camel_case_fn_names: Vec = rpcs.iter() + .map(|rpc| convert_str(&rpc.ident.to_string())) + .collect(); + let ref outputs: Vec = rpcs.iter().map(|rpc| match rpc.output { + ReturnType::Type(_, ref ty) => quote!(#ty), + ReturnType::Default => quote!(()), + }) + .collect(); + let future_types: Vec = camel_case_fn_names.iter() + .map(|name| Ident::new(&format!("{}Fut", name), ident.span())) + .collect(); + let ref camel_case_idents: Vec = rpcs.iter().zip(camel_case_fn_names.iter()) + .map(|(rpc, name)| Ident::new(name, rpc.ident.span())) + .collect(); + let camel_case_idents2 = camel_case_idents; + + let ref args: Vec<&Punctuated> = rpcs.iter().map(|rpc| &rpc.args).collect(); + let ref arg_vars: Vec> = + args.iter() + .map(|args| args.iter().map(|arg| &arg.pat).collect()) + .collect(); + let arg_vars2 = arg_vars; + let ref method_names: Vec<&Ident> = rpcs.iter().map(|rpc| &rpc.ident).collect(); + let method_attrs: Vec<_> = rpcs.iter().map(|rpc| &rpc.attrs).collect(); + + let types_and_fns = rpcs.iter() + .zip(future_types.iter()) + .zip(outputs.iter()) + .map(|((RpcMethod { attrs, ident, args, .. }, future_type), output)| { + let ty_doc = format!("The response future returned by {}.", ident); + quote! { + #[doc = #ty_doc] + type #future_type: std::future::Future; + + #( #attrs )* + fn #ident(self, context: tarpc::context::Context, #args) -> Self::#future_type; + } + }); + + let service_name_repeated = std::iter::repeat(ident.clone()); + let service_name_repeated2 = service_name_repeated.clone(); + + let client_ident = Ident::new(&format!("{}Client", ident), ident.span()); + + #[cfg(feature = "serde1")] + let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]); + #[cfg(not(feature = "serde1"))] + let derive_serialize = quote!(); + + let tokens = quote! { + #( #attrs )* + #vis trait #ident: Clone + Send + 'static { + #( #types_and_fns )* + } + + /// The request sent over the wire from the client to the server. + #[derive(Debug)] + #derive_serialize + #vis enum Request { + #( #camel_case_idents{ #args } ),* + } + + /// The response sent over the wire from the server to the client. + #[derive(Debug)] + #derive_serialize + #vis enum Response { + #( #camel_case_idents(#outputs) ),* + } + + /// A future resolving to a server [`Response`]. + #vis enum ResponseFut { + #( #camel_case_idents(::#future_types) ),* + } + + impl std::fmt::Debug for ResponseFut { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("ResponseFut").finish() + } + } + + impl std::future::Future for ResponseFut { + type Output = std::io::Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) + -> std::task::Poll> + { + unsafe { + match std::pin::Pin::get_unchecked_mut(self) { + #( + ResponseFut::#camel_case_idents(resp) => + std::pin::Pin::new_unchecked(resp) + .poll(cx) + .map(Response::#camel_case_idents2) + .map(Ok), + )* + } + } + } + } + + /// Returns a serving function to use with tarpc::server::Server. + #vis fn serve(service: S) + -> impl FnOnce(tarpc::context::Context, Request) -> ResponseFut + Send + 'static + Clone { + move |ctx, req| { + match req { + #( + Request::#camel_case_idents{ #arg_vars } => { + ResponseFut::#camel_case_idents2( + #service_name_repeated2::#method_names( + service.clone(), ctx, #arg_vars2)) + } + )* + } + } + } + + #[allow(unused)] + #[derive(Clone, Debug)] + /// The client stub that makes RPC calls to the server. Exposes a Future interface. + #vis struct #client_ident>(C); + + /// Returns a new client stub that sends requests over the given transport. + #vis async fn new_stub(config: tarpc::client::Config, transport: T) + -> std::io::Result<#client_ident> + where + T: tarpc::Transport, tarpc::Response> + Send + 'static, + { + Ok(#client_ident(tarpc::client::new(config, transport).await?)) + } + + impl From for #client_ident + where for <'a> C: tarpc::Client<'a, Request, Response = Response> + { + fn from(client: C) -> Self { + #client_ident(client) + } + } + + impl #client_ident + where for<'a> C: tarpc::Client<'a, Request, Response = Response> + { + #( + #[allow(unused)] + #( #method_attrs )* + pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args) + -> impl std::future::Future> + '_ { + let request = Request::#camel_case_idents { #arg_vars }; + let resp = tarpc::Client::call(&mut self.0, ctx, request); + async move { + match resp.await? { + Response::#camel_case_idents2(msg) => std::result::Result::Ok(msg), + _ => unreachable!(), + } + } + } + )* + } + }; + + tokens.into() } -#[proc_macro] -pub fn ty_snake_to_camel(input: TokenStream) -> TokenStream { - let mut path = parse::(input).unwrap(); - - // Only capitalize the final segment - convert(&mut path.path.segments.last_mut().unwrap().into_value().ident); - - path.into_token_stream().into() -} - -/// Converts an ident in-place to CamelCase and returns the previous ident. -fn convert(ident: &mut Ident) -> String { - let ident_str = ident.to_string(); +fn convert_str(ident_str: &str) -> String { let mut camel_ty = String::new(); - { - // Find the first non-underscore and add it capitalized. - let mut chars = ident_str.chars(); + // Find the first non-underscore and add it capitalized. + let mut chars = ident_str.chars(); - // Find the first non-underscore char, uppercase it, and append it. - // Guaranteed to succeed because all idents must have at least one non-underscore char. - camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase()); + // Find the first non-underscore char, uppercase it, and append it. + // Guaranteed to succeed because all idents must have at least one non-underscore char. + camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase()); - // When we find an underscore, we remove it and capitalize the next char. To do this, - // we need to ensure the next char is not another underscore. - let mut chars = chars.coalesce(|c1, c2| { - if c1 == '_' && c2 == '_' { - Ok(c1) - } else { - Err((c1, c2)) - } - }); + // When we find an underscore, we remove it and capitalize the next char. To do this, + // we need to ensure the next char is not another underscore. + let mut chars = chars.coalesce(|c1, c2| { + if c1 == '_' && c2 == '_' { + Ok(c1) + } else { + Err((c1, c2)) + } + }); - while let Some(c) = chars.next() { - if c != '_' { - camel_ty.push(c); - } else if let Some(c) = chars.next() { - camel_ty.extend(c.to_uppercase()); - } + while let Some(c) = chars.next() { + if c != '_' { + camel_ty.push(c); + } else if let Some(c) = chars.next() { + camel_ty.extend(c.to_uppercase()); } } - // The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose. - camel_ty.push_str("Fut"); - - *ident = Ident::new(&camel_ty, Span::call_site()); - ident_str + camel_ty } diff --git a/plugins/tests/service.rs b/plugins/tests/service.rs new file mode 100644 index 0000000..2c51251 --- /dev/null +++ b/plugins/tests/service.rs @@ -0,0 +1,55 @@ +#![feature(async_await)] + +use tarpc::context; + +#[test] +fn att_service_trait() { + use futures::future::{ready, Ready}; + + #[tarpc_plugins::service] + trait Foo { + async fn two_part(s: String, i: i32) -> (String, i32); + async fn bar(s: String) -> String; + async fn baz(); + } + + impl Foo for () { + type TwoPartFut = Ready<(String, i32)>; + fn two_part(self, _: context::Context, s: String, i: i32) -> Self::TwoPartFut { + ready((s, i)) + } + + type BarFut = Ready; + fn bar(self, _: context::Context, s: String) -> Self::BarFut { + ready(s) + } + + type BazFut = Ready<()>; + fn baz(self, _: context::Context) -> Self::BazFut { + ready(()) + } + } +} + +#[test] +fn syntax() { + #[tarpc_plugins::service] + trait Syntax { + #[deny(warnings)] + #[allow(non_snake_case)] + async fn TestCamelCaseDoesntConflict(); + async fn hello() -> String; + #[doc="attr"] + async fn attr(s: String) -> String; + async fn no_args_no_return(); + async fn no_args() -> (); + async fn one_arg(foo: String) -> i32; + async fn two_args_no_return(bar: String, baz: u64); + async fn two_args(bar: String, baz: u64) -> String; + async fn no_args_ret_error() -> i32; + async fn one_arg_ret_error(foo: String) -> String; + async fn no_arg_implicit_return_error(); + #[doc="attr"] + async fn one_arg_implicit_return_error(foo: String); + } +} diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index a2b05f4..933de98 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -13,25 +13,42 @@ readme = "../README.md" description = "An RPC framework for Rust with a focus on ease of use." [features] -serde1 = ["rpc/serde1", "serde", "serde/derive"] +serde1 = ["rpc/serde1", "tarpc-plugins/serde1", "serde", "serde/derive"] [badges] travis-ci = { repository = "google/tarpc" } [dependencies] -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } -log = "0.4" -runtime = "0.3.0-alpha.6" serde = { optional = true, version = "1.0" } rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" } tarpc-plugins = { path = "../plugins", version = "0.5.0" } [dev-dependencies] -bincode = "1" -bytes = { version = "0.4", features = ["serde"] } -humantime = "1.0" +assert_matches = "1.0" +bincode = "1.0" bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" } +bytes = { version = "0.4", features = ["serde"] } +futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +humantime = "1.0" env_logger = "0.6" +runtime = "0.3.0-alpha.6" runtime-tokio = "0.3.0-alpha.5" tokio-tcp = "0.1" pin-utils = "0.1.0-alpha.4" + +[[example]] +name = "service_registry" +required-features = ["serde1"] + +[[example]] +name = "server_calling_server" +required-features = ["serde1"] + +[[example]] +name = "readme" +required-features = ["serde1"] + +[[example]] +name = "pubsub" +required-features = ["serde1"] + diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 1df5ffb..35117c2 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -4,9 +4,10 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(async_await, existential_type, proc_macro_hygiene)] +#![feature(async_await, existential_type)] use futures::{ + compat::Executor01CompatExt, future::{self, Ready}, prelude::*, Future, @@ -25,17 +26,23 @@ use std::{ }; pub mod subscriber { - tarpc::service! { - rpc receive(message: String); + pub use ServiceClient as Client; + + #[tarpc::service] + pub trait Service { + async fn receive(message: String); } } pub mod publisher { + pub use ServiceClient as Client; use std::net::SocketAddr; - tarpc::service! { - rpc broadcast(message: String); - rpc subscribe(id: u32, address: SocketAddr) -> Result<(), String>; - rpc unsubscribe(id: u32); + + #[tarpc::service] + pub trait Service { + async fn broadcast(message: String); + async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>; + async fn unsubscribe(id: u32); } } @@ -48,7 +55,7 @@ impl subscriber::Service for Subscriber { type ReceiveFut = Ready<()>; fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut { - println!("{} received message: {}", self.id, message); + eprintln!("{} received message: {}", self.id, message); future::ready(()) } } @@ -108,7 +115,7 @@ impl publisher::Service for Publisher { ) -> io::Result<()> { let conn = bincode_transport::connect(&addr).await?; let subscriber = subscriber::new_stub(client::Config::default(), conn).await?; - println!("Subscribing {}.", id); + eprintln!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); Ok(()) } @@ -119,7 +126,7 @@ impl publisher::Service for Publisher { existential type UnsubscribeFut: Future; fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut { - println!("Unsubscribing {}", id); + eprintln!("Unsubscribing {}", id); let mut clients = self.clients.lock().unwrap(); if let None = clients.remove(&id) { eprintln!( diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index 5f07e0b..2aa2204 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(async_await, proc_macro_hygiene)] +#![feature(async_await)] use futures::{ future::{self, Ready}, @@ -19,8 +19,9 @@ use std::io; // This is the service definition. It looks a lot like a trait definition. // It defines one RPC, hello, which takes one arg, name, and returns a String. -tarpc::service! { - rpc hello(name: String) -> String; +#[tarpc::service] +pub trait Service { + async fn hello(name: String) -> String; } // This is the type that implements the generated Service trait. It is the business logic @@ -46,18 +47,20 @@ async fn main() -> io::Result<()> { let mut transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let addr = transport.local_addr(); - // For this example, we're just going to wait for one connection. - let client = transport.next().await.unwrap()?; - - // `Channel` is a trait representing a server-side connection. It is a trait to allow - // for some channels to be instrumented: for example, to track the number of open connections. - // BaseChannel is the most basic channel, simply wrapping a transport with no added - // functionality. - let server = BaseChannel::with_defaults(client) - // serve is generated by the tarpc::service! macro. It takes as input any type implementing - // the generated Service trait. - .respond_with(serve(HelloServer)); + let server = async move { + // For this example, we're just going to wait for one connection. + let client = transport.next().await.unwrap().unwrap(); + // `Channel` is a trait representing a server-side connection. It is a trait to allow + // for some channels to be instrumented: for example, to track the number of open connections. + // BaseChannel is the most basic channel, simply wrapping a transport with no added + // functionality. + BaseChannel::with_defaults(client) + // serve is generated by the tarpc::service! macro. It takes as input any type implementing + // the generated Service trait. + .respond_with(serve(HelloServer)) + .await; + }; let _ = runtime::spawn(server); let transport = bincode_transport::connect(&addr).await?; @@ -72,7 +75,7 @@ async fn main() -> io::Result<()> { // specifies a deadline and trace information which can be helpful in debugging requests. let hello = client.hello(context::current(), "Stim".to_string()).await?; - println!("{}", hello); + eprintln!("{}", hello); Ok(()) } diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index 0f0a8a4..90b0b96 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. -#![feature(existential_type, async_await, proc_macro_hygiene)] +#![feature(existential_type, async_await)] use crate::{add::Service as AddService, double::Service as DoubleService}; use futures::{ @@ -18,16 +18,22 @@ use rpc::{ use std::io; pub mod add { - tarpc::service! { + pub use ServiceClient as Client; + + #[tarpc::service] + pub trait Service { /// Add two ints together. - rpc add(x: i32, y: i32) -> i32; + async fn add(x: i32, y: i32) -> i32; } } pub mod double { - tarpc::service! { + pub use ServiceClient as Client; + + #[tarpc::service] + pub trait Service { /// 2 * x - rpc double(x: i32) -> Result; + async fn double(x: i32) -> Result; } } @@ -91,7 +97,7 @@ async fn main() -> io::Result<()> { let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?; for i in 1..=5 { - println!("{:?}", double_client.double(context::current(), i).await?); + eprintln!("{:?}", double_client.double(context::current(), i).await?); } Ok(()) } diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index 4e923f0..59f68c4 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -1,4 +1,4 @@ -#![feature(async_await, proc_macro_hygiene)] +#![feature(async_await)] mod registry { use bytes::Bytes; @@ -260,14 +260,20 @@ where } mod write_service { - tarpc::service! { - rpc write(key: String, value: String); + pub use ServiceClient as Client; + + #[tarpc::service] + pub trait Service { + async fn write(key: String, value: String); } } mod read_service { - tarpc::service! { - rpc read(key: String) -> Option; + pub use ServiceClient as Client; + + #[tarpc::service] + pub trait Service { + async fn read(key: String) -> Option; } } @@ -390,7 +396,7 @@ async fn main() -> io::Result<()> { let val = read_client .read(context::current(), "key".to_string()) .await?; - println!("{:?}", val); + eprintln!("{:?}", val); Ok(()) } diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 35e1be6..e5948a9 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -9,15 +9,29 @@ #![feature(async_await, external_doc)] #![cfg_attr(test, feature(proc_macro_hygiene))] -#[doc(hidden)] -pub use futures; +/// The main macro that creates RPC services. +/// +/// Rpc methods are specified, mirroring trait syntax: +/// +/// ``` +/// # #![feature(async_await, proc_macro_hygiene)] +/// # fn main() {} +/// #[tarpc::service] +/// trait Service { +/// /// Say hello +/// async fn hello(name: String) -> String; +/// } +/// ``` +/// +/// Attributes can be attached to each rpc. These attributes +/// will then be attached to the generated service traits' +/// corresponding `fn`s, as well as to the client stubs' RPCs. +/// +/// The following items are expanded in the enclosing module: +/// +/// * `trait Service` -- defines the RPC service. +/// * `fn serve` -- turns a service impl into a request handler. +/// * `Client` -- a client stub with a fn for each RPC. +/// * `fn new_stub` -- creates a new Client stub. +pub use tarpc_plugins::service; pub use rpc::*; -#[cfg(feature = "serde")] -#[doc(hidden)] -pub use serde; -#[doc(hidden)] -pub use tarpc_plugins::*; - -/// Provides the macro used for constructing rpc services and client stubs. -#[macro_use] -mod macros; diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs new file mode 100644 index 0000000..9f5d346 --- /dev/null +++ b/tarpc/tests/service_functional.rs @@ -0,0 +1,105 @@ +#![feature(async_await)] + +use assert_matches::assert_matches; +use futures::{ + future::{ready, Ready}, + prelude::*, +}; +use tarpc::{client, context, server::Handler, transport::channel}; +#[cfg(feature = "serde1")] +use std::io; + + +#[tarpc_plugins::service] +trait Service { + async fn add(x: i32, y: i32) -> i32; + async fn hey(name: String) -> String; +} + +#[derive(Clone)] +struct Server; + +impl Service for Server { + type AddFut = Ready; + + fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut { + ready(x + y) + } + + type HeyFut = Ready; + + fn hey(self, _: context::Context, name: String) -> Self::HeyFut { + ready(format!("Hey, {}.", name)) + } +} + +#[runtime::test(runtime_tokio::TokioCurrentThread)] +async fn sequential() { + let _ = env_logger::try_init(); + + let (tx, rx) = channel::unbounded(); + let _ = runtime::spawn( + tarpc::Server::default() + .incoming(stream::once(ready(rx))) + .respond_with(serve(Server)), + ); + + let mut client = new_stub(client::Config::default(), tx).await.unwrap(); + assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap()); + assert_eq!( + "Hey, Tim.", + client + .hey(context::current(), "Tim".to_string()) + .await + .unwrap() + ); +} + +#[cfg(feature = "serde1")] +#[runtime::test(runtime_tokio::TokioCurrentThread)] +async fn serde() -> io::Result<()> { + let _ = env_logger::try_init(); + + let transport = bincode_transport::listen(&([0, 0, 0, 0], 56789).into())?; + let addr = transport.local_addr(); + let _ = runtime::spawn( + tarpc::Server::default() + .incoming(transport.take(1).filter_map(|r| async { r.ok() })) + .respond_with(serve(Server)) + ); + + let transport = bincode_transport::connect(&addr).await?; + let mut client = new_stub(client::Config::default(), transport).await?; + + assert_matches!(client.add(context::current(), 1, 2).await, Ok(3)); + assert_matches!( + client.hey(context::current(), "Tim".to_string()).await, + Ok(ref s) if s == "Hey, Tim." + ); + + Ok(()) +} + +#[runtime::test(runtime_tokio::TokioCurrentThread)] +async fn concurrent() { + let _ = env_logger::try_init(); + + let (tx, rx) = channel::unbounded(); + let _ = runtime::spawn( + rpc::Server::default() + .incoming(stream::once(ready(rx))) + .respond_with(serve(Server)), + ); + + let client = new_stub(client::Config::default(), tx).await.unwrap(); + let mut c = client.clone(); + let req1 = c.add(context::current(), 1, 2); + let mut c = client.clone(); + let req2 = c.add(context::current(), 3, 4); + let mut c = client.clone(); + let req3 = c.hey(context::current(), "Tim".to_string()); + + assert_eq!(3, req1.await.unwrap()); + assert_eq!(7, req2.await.unwrap()); + assert_eq!("Hey, Tim.", req3.await.unwrap()); +}