Rewrite to use proc_macro_attribute

This commit is contained in:
Tim Kuehn
2019-07-20 06:13:33 -07:00
committed by Tim
parent 49f2641e3c
commit abb0b5b3ac
14 changed files with 565 additions and 131 deletions

View File

@@ -11,3 +11,4 @@ os:
script:
- cargo test --all-targets --all-features
- cargo test --doc --all-features
- cargo run --example 2>&1 | grep ' ' | awk '{print $1}' | xargs -L 1 cargo run --all-features --example

View File

@@ -76,9 +76,10 @@ use std::io;
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
tarpc::service! {
#[tarpc::service]
trait Service {
/// Returns a greeting for name.
rpc hello(name: String) -> String;
async fn hello(name: String) -> String;
}
```
@@ -102,9 +103,10 @@ implement it for our Server struct.
#
# // This is the service definition. It looks a lot like a trait definition.
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
# tarpc::service! {
# #[tarpc::service]
# trait Service {
# /// Returns a greeting for name.
# rpc hello(name: String) -> String;
# async fn hello(name: String) -> String;
# }
#
// This is the type that implements the generated Service trait. It is the business logic
@@ -147,9 +149,10 @@ that uses bincode over TCP.
#
# // This is the service definition. It looks a lot like a trait definition.
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
# tarpc::service! {
# #[tarpc::service]
# trait Service {
# /// Returns a greeting for name.
# rpc hello(name: String) -> String;
# async fn hello(name: String) -> String;
# }
#
# // This is the type that implements the generated Service trait. It is the business logic
@@ -198,7 +201,6 @@ async fn main() -> io::Result<()> {
}
```
## Service Documentation
Use `cargo doc` as you normally would to see the documentation created for all

View File

@@ -4,15 +4,12 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(async_await, proc_macro_hygiene)]
#![feature(async_await)]
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
tarpc::service! {
/// This is the service definition. It looks a lot like a trait definition.
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
#[tarpc::service]
pub trait Service {
/// Returns a greeting for name.
rpc hello(#[serde(default = "default_name")] name: String) -> String;
}
fn default_name() -> String {
"DefaultName".into()
async fn hello(name: String) -> String;
}

View File

@@ -91,7 +91,11 @@ if [ "$?" == 0 ]; then
try_run "Building ... " cargo build --color=always
try_run "Testing ... " cargo test --color=always
try_run "Doc Test ... " cargo clean && cargo build --tests && rustdoc --test README.md --edition 2018 -L target/debug/deps -Z unstable-options
try_run "Testing with all features enabled ... " cargo test --all-features --color=always
for EXAMPLE in $(cargo run --example 2>&1 | grep ' ' | awk '{print $1}')
do
try_run "Running example \"$EXAMPLE\" ... " cargo run --all-features --example $EXAMPLE
done
fi

View File

@@ -2,6 +2,7 @@
name = "tarpc-plugins"
version = "0.5.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
documentation = "https://docs.rs/tarpc-plugins"
homepage = "https://github.com/google/tarpc"
@@ -11,6 +12,9 @@ categories = ["asynchronous", "network-programming"]
readme = "../README.md"
description = "Proc macros for tarpc."
[features]
serde1 = []
[badges]
travis-ci = { repository = "google/tarpc" }
@@ -22,3 +26,17 @@ proc-macro2 = "0.4"
[lib]
proc-macro = true
[dev-dependencies]
bincode = "1"
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
bytes = { version = "0.4", features = ["serde"] }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
humantime = "1.0"
env_logger = "0.6"
serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" }
tokio = "0.1"
tokio-executor = "0.1"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.4"

View File

@@ -4,87 +4,286 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![recursion_limit = "512"]
extern crate itertools;
extern crate proc_macro;
extern crate proc_macro2;
extern crate quote;
extern crate syn;
use proc_macro2::TokenStream as TokenStream2;
use proc_macro::TokenStream;
use itertools::Itertools;
use proc_macro2::Span;
use quote::ToTokens;
use std::str::FromStr;
use syn::{parse, Ident, TraitItemType, TypePath};
use quote::quote;
use syn::{parse_macro_input, parenthesized, braced, Attribute, Ident, FnArg, ArgCaptured,
ReturnType, Pat, Token, Visibility,
parse::{Parse, ParseStream},
punctuated::Punctuated,
spanned::Spanned,
token::Comma};
#[proc_macro]
pub fn snake_to_camel(input: TokenStream) -> TokenStream {
let i = input.clone();
let mut assoc_type = parse::<TraitItemType>(input)
.unwrap_or_else(|_| panic!("Could not parse trait item from:\n{}", i));
struct Service {
attrs: Vec<Attribute>,
vis: Visibility,
ident: Ident,
rpcs: Vec<RpcMethod>,
}
let old_ident = convert(&mut assoc_type.ident);
struct RpcMethod {
attrs: Vec<Attribute>,
ident: Ident,
args: Punctuated<ArgCaptured, Comma>,
output: ReturnType,
}
for mut attr in &mut assoc_type.attrs {
if let Some(pair) = attr.path.segments.first() {
if pair.value().ident == "doc" {
attr.tts = proc_macro2::TokenStream::from_str(
&attr.tts.to_string().replace("{}", &old_ident),
)
.unwrap();
}
impl Parse for Service {
fn parse(input: ParseStream) -> syn::Result<Self> {
let attrs = input.call(Attribute::parse_outer)?;
let vis = input.parse()?;
input.parse::<Token![trait]>()?;
let ident = input.parse()?;
let content;
braced!(content in input);
let mut rpcs = Vec::new();
while !content.is_empty() {
rpcs.push(content.parse()?);
}
Ok(Service { attrs, vis, ident, rpcs })
}
}
impl Parse for RpcMethod {
fn parse(input: ParseStream) -> syn::Result<Self> {
let attrs = input.call(Attribute::parse_outer)?;
input.parse::<Token![async]>()?;
input.parse::<Token![fn]>()?;
let ident = input.parse()?;
let content;
parenthesized!(content in input);
let args: Punctuated<FnArg, Comma> = content.parse_terminated(FnArg::parse)?;
let args = args.into_iter().map(|arg| match arg {
FnArg::Captured(captured) => match captured.pat {
Pat::Ident(_) => Ok(captured),
_ => return Err(syn::Error::new(
captured.pat.span(), "patterns aren't allowed in RPC args"))
},
FnArg::SelfRef(self_ref) => return Err(syn::Error::new(
self_ref.span(), "RPC args cannot start with self")),
FnArg::SelfValue(self_val) => return Err(syn::Error::new(
self_val.span(), "RPC args cannot start with self")),
arg => return Err(syn::Error::new(
arg.span(), "RPC args must be explicitly typed patterns")),
})
.collect::<Result<_, _>>()?;
let output = input.parse()?;
input.parse::<Token![;]>()?;
Ok(RpcMethod { attrs, ident, args, output, })
}
}
assoc_type.into_token_stream().into()
#[proc_macro_attribute]
pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
struct EmptyArgs;
impl Parse for EmptyArgs {
fn parse(_: ParseStream) -> syn::Result<Self> {
Ok(EmptyArgs)
}
}
parse_macro_input!(attr as EmptyArgs);
let Service { attrs, vis, ident, rpcs } = parse_macro_input!(input as Service);
let camel_case_fn_names: Vec<String> = rpcs.iter()
.map(|rpc| convert_str(&rpc.ident.to_string()))
.collect();
let ref outputs: Vec<TokenStream2> = rpcs.iter().map(|rpc| match rpc.output {
ReturnType::Type(_, ref ty) => quote!(#ty),
ReturnType::Default => quote!(()),
})
.collect();
let future_types: Vec<Ident> = camel_case_fn_names.iter()
.map(|name| Ident::new(&format!("{}Fut", name), ident.span()))
.collect();
let ref camel_case_idents: Vec<Ident> = rpcs.iter().zip(camel_case_fn_names.iter())
.map(|(rpc, name)| Ident::new(name, rpc.ident.span()))
.collect();
let camel_case_idents2 = camel_case_idents;
let ref args: Vec<&Punctuated<ArgCaptured, Comma>> = rpcs.iter().map(|rpc| &rpc.args).collect();
let ref arg_vars: Vec<Punctuated<&Pat, Comma>> =
args.iter()
.map(|args| args.iter().map(|arg| &arg.pat).collect())
.collect();
let arg_vars2 = arg_vars;
let ref method_names: Vec<&Ident> = rpcs.iter().map(|rpc| &rpc.ident).collect();
let method_attrs: Vec<_> = rpcs.iter().map(|rpc| &rpc.attrs).collect();
let types_and_fns = rpcs.iter()
.zip(future_types.iter())
.zip(outputs.iter())
.map(|((RpcMethod { attrs, ident, args, .. }, future_type), output)| {
let ty_doc = format!("The response future returned by {}.", ident);
quote! {
#[doc = #ty_doc]
type #future_type: std::future::Future<Output = #output>;
#( #attrs )*
fn #ident(self, context: tarpc::context::Context, #args) -> Self::#future_type;
}
});
let service_name_repeated = std::iter::repeat(ident.clone());
let service_name_repeated2 = service_name_repeated.clone();
let client_ident = Ident::new(&format!("{}Client", ident), ident.span());
#[cfg(feature = "serde1")]
let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]);
#[cfg(not(feature = "serde1"))]
let derive_serialize = quote!();
let tokens = quote! {
#( #attrs )*
#vis trait #ident: Clone + Send + 'static {
#( #types_and_fns )*
}
/// The request sent over the wire from the client to the server.
#[derive(Debug)]
#derive_serialize
#vis enum Request {
#( #camel_case_idents{ #args } ),*
}
/// The response sent over the wire from the server to the client.
#[derive(Debug)]
#derive_serialize
#vis enum Response {
#( #camel_case_idents(#outputs) ),*
}
/// A future resolving to a server [`Response`].
#vis enum ResponseFut<S: #ident> {
#( #camel_case_idents(<S as #service_name_repeated>::#future_types) ),*
}
impl<S: #ident> std::fmt::Debug for ResponseFut<S> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("ResponseFut").finish()
}
}
impl<S: #ident> std::future::Future for ResponseFut<S> {
type Output = std::io::Result<Response>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>)
-> std::task::Poll<std::io::Result<Response>>
{
unsafe {
match std::pin::Pin::get_unchecked_mut(self) {
#(
ResponseFut::#camel_case_idents(resp) =>
std::pin::Pin::new_unchecked(resp)
.poll(cx)
.map(Response::#camel_case_idents2)
.map(Ok),
)*
}
}
}
}
/// Returns a serving function to use with tarpc::server::Server.
#vis fn serve<S: #ident>(service: S)
-> impl FnOnce(tarpc::context::Context, Request) -> ResponseFut<S> + Send + 'static + Clone {
move |ctx, req| {
match req {
#(
Request::#camel_case_idents{ #arg_vars } => {
ResponseFut::#camel_case_idents2(
#service_name_repeated2::#method_names(
service.clone(), ctx, #arg_vars2))
}
)*
}
}
}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
#vis struct #client_ident<C = tarpc::client::Channel<Request, Response>>(C);
/// Returns a new client stub that sends requests over the given transport.
#vis async fn new_stub<T>(config: tarpc::client::Config, transport: T)
-> std::io::Result<#client_ident>
where
T: tarpc::Transport<tarpc::ClientMessage<Request>, tarpc::Response<Response>> + Send + 'static,
{
Ok(#client_ident(tarpc::client::new(config, transport).await?))
}
impl<C> From<C> for #client_ident<C>
where for <'a> C: tarpc::Client<'a, Request, Response = Response>
{
fn from(client: C) -> Self {
#client_ident(client)
}
}
impl<C> #client_ident<C>
where for<'a> C: tarpc::Client<'a, Request, Response = Response>
{
#(
#[allow(unused)]
#( #method_attrs )*
pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args)
-> impl std::future::Future<Output = std::io::Result<#outputs>> + '_ {
let request = Request::#camel_case_idents { #arg_vars };
let resp = tarpc::Client::call(&mut self.0, ctx, request);
async move {
match resp.await? {
Response::#camel_case_idents2(msg) => std::result::Result::Ok(msg),
_ => unreachable!(),
}
}
}
)*
}
};
tokens.into()
}
#[proc_macro]
pub fn ty_snake_to_camel(input: TokenStream) -> TokenStream {
let mut path = parse::<TypePath>(input).unwrap();
// Only capitalize the final segment
convert(&mut path.path.segments.last_mut().unwrap().into_value().ident);
path.into_token_stream().into()
}
/// Converts an ident in-place to CamelCase and returns the previous ident.
fn convert(ident: &mut Ident) -> String {
let ident_str = ident.to_string();
fn convert_str(ident_str: &str) -> String {
let mut camel_ty = String::new();
{
// Find the first non-underscore and add it capitalized.
let mut chars = ident_str.chars();
// Find the first non-underscore and add it capitalized.
let mut chars = ident_str.chars();
// Find the first non-underscore char, uppercase it, and append it.
// Guaranteed to succeed because all idents must have at least one non-underscore char.
camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase());
// Find the first non-underscore char, uppercase it, and append it.
// Guaranteed to succeed because all idents must have at least one non-underscore char.
camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase());
// When we find an underscore, we remove it and capitalize the next char. To do this,
// we need to ensure the next char is not another underscore.
let mut chars = chars.coalesce(|c1, c2| {
if c1 == '_' && c2 == '_' {
Ok(c1)
} else {
Err((c1, c2))
}
});
// When we find an underscore, we remove it and capitalize the next char. To do this,
// we need to ensure the next char is not another underscore.
let mut chars = chars.coalesce(|c1, c2| {
if c1 == '_' && c2 == '_' {
Ok(c1)
} else {
Err((c1, c2))
}
});
while let Some(c) = chars.next() {
if c != '_' {
camel_ty.push(c);
} else if let Some(c) = chars.next() {
camel_ty.extend(c.to_uppercase());
}
while let Some(c) = chars.next() {
if c != '_' {
camel_ty.push(c);
} else if let Some(c) = chars.next() {
camel_ty.extend(c.to_uppercase());
}
}
// The Fut suffix is hardcoded right now; this macro isn't really meant to be general-purpose.
camel_ty.push_str("Fut");
*ident = Ident::new(&camel_ty, Span::call_site());
ident_str
camel_ty
}

55
plugins/tests/service.rs Normal file
View File

@@ -0,0 +1,55 @@
#![feature(async_await)]
use tarpc::context;
#[test]
fn att_service_trait() {
use futures::future::{ready, Ready};
#[tarpc_plugins::service]
trait Foo {
async fn two_part(s: String, i: i32) -> (String, i32);
async fn bar(s: String) -> String;
async fn baz();
}
impl Foo for () {
type TwoPartFut = Ready<(String, i32)>;
fn two_part(self, _: context::Context, s: String, i: i32) -> Self::TwoPartFut {
ready((s, i))
}
type BarFut = Ready<String>;
fn bar(self, _: context::Context, s: String) -> Self::BarFut {
ready(s)
}
type BazFut = Ready<()>;
fn baz(self, _: context::Context) -> Self::BazFut {
ready(())
}
}
}
#[test]
fn syntax() {
#[tarpc_plugins::service]
trait Syntax {
#[deny(warnings)]
#[allow(non_snake_case)]
async fn TestCamelCaseDoesntConflict();
async fn hello() -> String;
#[doc="attr"]
async fn attr(s: String) -> String;
async fn no_args_no_return();
async fn no_args() -> ();
async fn one_arg(foo: String) -> i32;
async fn two_args_no_return(bar: String, baz: u64);
async fn two_args(bar: String, baz: u64) -> String;
async fn no_args_ret_error() -> i32;
async fn one_arg_ret_error(foo: String) -> String;
async fn no_arg_implicit_return_error();
#[doc="attr"]
async fn one_arg_implicit_return_error(foo: String);
}
}

View File

@@ -13,25 +13,42 @@ readme = "../README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[features]
serde1 = ["rpc/serde1", "serde", "serde/derive"]
serde1 = ["rpc/serde1", "tarpc-plugins/serde1", "serde", "serde/derive"]
[badges]
travis-ci = { repository = "google/tarpc" }
[dependencies]
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
log = "0.4"
runtime = "0.3.0-alpha.6"
serde = { optional = true, version = "1.0" }
rpc = { package = "tarpc-lib", path = "../rpc", version = "0.6" }
tarpc-plugins = { path = "../plugins", version = "0.5.0" }
[dev-dependencies]
bincode = "1"
bytes = { version = "0.4", features = ["serde"] }
humantime = "1.0"
assert_matches = "1.0"
bincode = "1.0"
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
bytes = { version = "0.4", features = ["serde"] }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
humantime = "1.0"
env_logger = "0.6"
runtime = "0.3.0-alpha.6"
runtime-tokio = "0.3.0-alpha.5"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.4"
[[example]]
name = "service_registry"
required-features = ["serde1"]
[[example]]
name = "server_calling_server"
required-features = ["serde1"]
[[example]]
name = "readme"
required-features = ["serde1"]
[[example]]
name = "pubsub"
required-features = ["serde1"]

View File

@@ -4,9 +4,10 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(async_await, existential_type, proc_macro_hygiene)]
#![feature(async_await, existential_type)]
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
Future,
@@ -25,17 +26,23 @@ use std::{
};
pub mod subscriber {
tarpc::service! {
rpc receive(message: String);
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
async fn receive(message: String);
}
}
pub mod publisher {
pub use ServiceClient as Client;
use std::net::SocketAddr;
tarpc::service! {
rpc broadcast(message: String);
rpc subscribe(id: u32, address: SocketAddr) -> Result<(), String>;
rpc unsubscribe(id: u32);
#[tarpc::service]
pub trait Service {
async fn broadcast(message: String);
async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>;
async fn unsubscribe(id: u32);
}
}
@@ -48,7 +55,7 @@ impl subscriber::Service for Subscriber {
type ReceiveFut = Ready<()>;
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
println!("{} received message: {}", self.id, message);
eprintln!("{} received message: {}", self.id, message);
future::ready(())
}
}
@@ -108,7 +115,7 @@ impl publisher::Service for Publisher {
) -> io::Result<()> {
let conn = bincode_transport::connect(&addr).await?;
let subscriber = subscriber::new_stub(client::Config::default(), conn).await?;
println!("Subscribing {}.", id);
eprintln!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
Ok(())
}
@@ -119,7 +126,7 @@ impl publisher::Service for Publisher {
existential type UnsubscribeFut: Future<Output = ()>;
fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
println!("Unsubscribing {}", id);
eprintln!("Unsubscribing {}", id);
let mut clients = self.clients.lock().unwrap();
if let None = clients.remove(&id) {
eprintln!(

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(async_await, proc_macro_hygiene)]
#![feature(async_await)]
use futures::{
future::{self, Ready},
@@ -19,8 +19,9 @@ use std::io;
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
tarpc::service! {
rpc hello(name: String) -> String;
#[tarpc::service]
pub trait Service {
async fn hello(name: String) -> String;
}
// This is the type that implements the generated Service trait. It is the business logic
@@ -46,18 +47,20 @@ async fn main() -> io::Result<()> {
let mut transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = transport.local_addr();
// For this example, we're just going to wait for one connection.
let client = transport.next().await.unwrap()?;
// `Channel` is a trait representing a server-side connection. It is a trait to allow
// for some channels to be instrumented: for example, to track the number of open connections.
// BaseChannel is the most basic channel, simply wrapping a transport with no added
// functionality.
let server = BaseChannel::with_defaults(client)
// serve is generated by the tarpc::service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(serve(HelloServer));
let server = async move {
// For this example, we're just going to wait for one connection.
let client = transport.next().await.unwrap().unwrap();
// `Channel` is a trait representing a server-side connection. It is a trait to allow
// for some channels to be instrumented: for example, to track the number of open connections.
// BaseChannel is the most basic channel, simply wrapping a transport with no added
// functionality.
BaseChannel::with_defaults(client)
// serve is generated by the tarpc::service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(serve(HelloServer))
.await;
};
let _ = runtime::spawn(server);
let transport = bincode_transport::connect(&addr).await?;
@@ -72,7 +75,7 @@ async fn main() -> io::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello);
eprintln!("{}", hello);
Ok(())
}

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
#![feature(existential_type, async_await, proc_macro_hygiene)]
#![feature(existential_type, async_await)]
use crate::{add::Service as AddService, double::Service as DoubleService};
use futures::{
@@ -18,16 +18,22 @@ use rpc::{
use std::io;
pub mod add {
tarpc::service! {
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
/// Add two ints together.
rpc add(x: i32, y: i32) -> i32;
async fn add(x: i32, y: i32) -> i32;
}
}
pub mod double {
tarpc::service! {
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
/// 2 * x
rpc double(x: i32) -> Result<i32, String>;
async fn double(x: i32) -> Result<i32, String>;
}
}
@@ -91,7 +97,7 @@ async fn main() -> io::Result<()> {
let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
for i in 1..=5 {
println!("{:?}", double_client.double(context::current(), i).await?);
eprintln!("{:?}", double_client.double(context::current(), i).await?);
}
Ok(())
}

View File

@@ -1,4 +1,4 @@
#![feature(async_await, proc_macro_hygiene)]
#![feature(async_await)]
mod registry {
use bytes::Bytes;
@@ -260,14 +260,20 @@ where
}
mod write_service {
tarpc::service! {
rpc write(key: String, value: String);
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
async fn write(key: String, value: String);
}
}
mod read_service {
tarpc::service! {
rpc read(key: String) -> Option<String>;
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
async fn read(key: String) -> Option<String>;
}
}
@@ -390,7 +396,7 @@ async fn main() -> io::Result<()> {
let val = read_client
.read(context::current(), "key".to_string())
.await?;
println!("{:?}", val);
eprintln!("{:?}", val);
Ok(())
}

View File

@@ -9,15 +9,29 @@
#![feature(async_await, external_doc)]
#![cfg_attr(test, feature(proc_macro_hygiene))]
#[doc(hidden)]
pub use futures;
/// The main macro that creates RPC services.
///
/// Rpc methods are specified, mirroring trait syntax:
///
/// ```
/// # #![feature(async_await, proc_macro_hygiene)]
/// # fn main() {}
/// #[tarpc::service]
/// trait Service {
/// /// Say hello
/// async fn hello(name: String) -> String;
/// }
/// ```
///
/// Attributes can be attached to each rpc. These attributes
/// will then be attached to the generated service traits'
/// corresponding `fn`s, as well as to the client stubs' RPCs.
///
/// The following items are expanded in the enclosing module:
///
/// * `trait Service` -- defines the RPC service.
/// * `fn serve` -- turns a service impl into a request handler.
/// * `Client` -- a client stub with a fn for each RPC.
/// * `fn new_stub` -- creates a new Client stub.
pub use tarpc_plugins::service;
pub use rpc::*;
#[cfg(feature = "serde")]
#[doc(hidden)]
pub use serde;
#[doc(hidden)]
pub use tarpc_plugins::*;
/// Provides the macro used for constructing rpc services and client stubs.
#[macro_use]
mod macros;

View File

@@ -0,0 +1,105 @@
#![feature(async_await)]
use assert_matches::assert_matches;
use futures::{
future::{ready, Ready},
prelude::*,
};
use tarpc::{client, context, server::Handler, transport::channel};
#[cfg(feature = "serde1")]
use std::io;
#[tarpc_plugins::service]
trait Service {
async fn add(x: i32, y: i32) -> i32;
async fn hey(name: String) -> String;
}
#[derive(Clone)]
struct Server;
impl Service for Server {
type AddFut = Ready<i32>;
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
ready(x + y)
}
type HeyFut = Ready<String>;
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name))
}
}
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn sequential() {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
let _ = runtime::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server)),
);
let mut client = new_stub(client::Config::default(), tx).await.unwrap();
assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap());
assert_eq!(
"Hey, Tim.",
client
.hey(context::current(), "Tim".to_string())
.await
.unwrap()
);
}
#[cfg(feature = "serde1")]
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn serde() -> io::Result<()> {
let _ = env_logger::try_init();
let transport = bincode_transport::listen(&([0, 0, 0, 0], 56789).into())?;
let addr = transport.local_addr();
let _ = runtime::spawn(
tarpc::Server::default()
.incoming(transport.take(1).filter_map(|r| async { r.ok() }))
.respond_with(serve(Server))
);
let transport = bincode_transport::connect(&addr).await?;
let mut client = new_stub(client::Config::default(), transport).await?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
assert_matches!(
client.hey(context::current(), "Tim".to_string()).await,
Ok(ref s) if s == "Hey, Tim."
);
Ok(())
}
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn concurrent() {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
let _ = runtime::spawn(
rpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server)),
);
let client = new_stub(client::Config::default(), tx).await.unwrap();
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
assert_eq!(3, req1.await.unwrap());
assert_eq!(7, req2.await.unwrap());
assert_eq!("Hey, Tim.", req3.await.unwrap());
}