Merge pull request #248 from tikue/service-idents

With this change, the service definitions don't need to be isolated in their own modules.

Given:

```rust
#[tarpc::service]
trait World { ... }
```

Before this would generate the following items
------
- `trait World`
- `fn serve`
- `struct Client`
- `fn new_stub`

`// Implementation details below`
- `enum Request`
- `enum Response`
- `enum ResponseFut`

And now these items
------
- `trait World {    ...    fn serve }`
- `struct WorldClient ... impl WorldClient {    ...    async fn new }`

`// Implementation details below`
- `enum WorldRequest`
- `enum WorldResponse`
- `enum WorldResponseFut`
- `struct ServeWorld` (new manual closure impl because you can't use impl Trait in trait fns)
```
This commit is contained in:
Tim
2019-08-05 12:23:35 -07:00
committed by GitHub
17 changed files with 317 additions and 242 deletions

View File

@@ -36,12 +36,9 @@ Add to your `Cargo.toml` dependencies:
tarpc = "0.18.0"
```
The `service!` macro expands to a collection of items that form an rpc service.
In the above example, the macro is called within the `hello_service` module.
This module will contain a `Client` stub and `Service` trait. These generated
types make it easy and ergonomic to write servers without dealing with
serialization directly. Simply implement one of the generated traits, and you're
off to the races!
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
These generated types make it easy and ergonomic to write servers with less boilerplate.
Simply implement the generated service trait, and you're off to the races!
## Example
@@ -77,13 +74,13 @@ use std::io;
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
#[tarpc::service]
trait Service {
trait World {
/// Returns a greeting for name.
async fn hello(name: String) -> String;
}
```
This service definition generates a trait called `Service`. Next we need to
This service definition generates a trait called `World`. Next we need to
implement it for our Server struct.
```rust
@@ -104,17 +101,17 @@ implement it for our Server struct.
# // This is the service definition. It looks a lot like a trait definition.
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
# #[tarpc::service]
# trait Service {
# trait World {
# /// Returns a greeting for name.
# async fn hello(name: String) -> String;
# }
#
// This is the type that implements the generated Service trait. It is the business logic
// This is the type that implements the generated World trait. It is the business logic
// and is used to start the server.
#[derive(Clone)]
struct HelloServer;
impl Service for HelloServer {
impl World for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
@@ -150,17 +147,17 @@ that uses bincode over TCP.
# // This is the service definition. It looks a lot like a trait definition.
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
# #[tarpc::service]
# trait Service {
# trait World {
# /// Returns a greeting for name.
# async fn hello(name: String) -> String;
# }
#
# // This is the type that implements the generated Service trait. It is the business logic
# // This is the type that implements the generated World trait. It is the business logic
# // and is used to start the server.
# #[derive(Clone)]
# struct HelloServer;
#
# impl Service for HelloServer {
# impl World for HelloServer {
# // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
# // an associated type representing the future output by the fn.
#
@@ -179,19 +176,16 @@ async fn main() -> io::Result<()> {
// incoming() takes a stream of transports such as would be returned by
// TcpListener::incoming (but a stream instead of an iterator).
.incoming(stream::once(future::ready(server_transport)))
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(serve(HelloServer));
.respond_with(HelloServer.serve());
let _ = runtime::spawn(server);
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = new_stub(client::Config::default(), client_transport).await?;
// WorldClient is generated by the macro. It has a constructor `new` that takes a config and
// any Transport as input
let mut client = WorldClient::new(client::Config::default(), client_transport).await?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;

View File

@@ -13,7 +13,6 @@ readme = "../README.md"
description = "A bincode-based transport for tarpc services."
[dependencies]
bincode = "1"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
futures_legacy = { version = "0.1", package = "futures" }
pin-utils = "0.1.0-alpha.4"

View File

@@ -44,13 +44,12 @@ async fn main() -> io::Result<()> {
let transport = json_transport::connect(&server_addr).await?;
// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = service::new_stub(client::Config::default(), transport).await?;
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input.
let mut client = service::WorldClient::new(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), name).await?;

View File

@@ -9,7 +9,7 @@
/// This is the service definition. It looks a lot like a trait definition.
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
#[tarpc::service]
pub trait Service {
pub trait World {
/// Returns a greeting for name.
async fn hello(name: String) -> String;
}

View File

@@ -11,18 +11,19 @@ use futures::{
future::{self, Ready},
prelude::*,
};
use service::World;
use std::{io, net::SocketAddr};
use tarpc::{
context,
server::{self, Channel, Handler},
};
// This is the type that implements the generated Service trait. It is the business logic
// This is the type that implements the generated World trait. It is the business logic
// and is used to start the server.
#[derive(Clone)]
struct HelloServer(SocketAddr);
impl service::Service for HelloServer {
impl World for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
@@ -70,11 +71,11 @@ async fn main() -> io::Result<()> {
.map(server::BaseChannel::with_defaults)
// Limit channels to 1 per IP.
.max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip())
// serve is generated by the service! macro. It takes as input any type implementing
// the generated Service trait.
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap());
channel.respond_with(service::serve(server))
channel.respond_with(server.serve())
})
// Max 10 channels.
.buffer_unordered(10)

View File

@@ -19,8 +19,7 @@ serde1 = []
travis-ci = { repository = "google/tarpc" }
[dependencies]
itertools = "0.8"
syn = { version = "0.15", features = ["full", "extra-traits"] }
syn = { version = "0.15", features = ["full"] }
quote = "0.6"
proc-macro2 = "0.4"
@@ -28,15 +27,6 @@ proc-macro2 = "0.4"
proc-macro = true
[dev-dependencies]
bincode = "1"
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
bytes = { version = "0.4", features = ["serde"] }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
humantime = "1.0"
env_logger = "0.6"
serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" }
tokio = "0.1"
tokio-executor = "0.1"
tokio-tcp = "0.1"
pin-utils = "0.1.0-alpha.4"

View File

@@ -6,13 +6,11 @@
#![recursion_limit = "512"]
extern crate itertools;
extern crate proc_macro;
extern crate proc_macro2;
extern crate quote;
extern crate syn;
use itertools::Itertools;
use proc_macro::TokenStream;
use proc_macro2::TokenStream as TokenStream2;
use quote::quote;
@@ -48,10 +46,24 @@ impl Parse for Service {
let ident = input.parse()?;
let content;
braced!(content in input);
let mut rpcs = Vec::new();
let mut rpcs = Vec::<RpcMethod>::new();
while !content.is_empty() {
rpcs.push(content.parse()?);
}
for rpc in &rpcs {
if rpc.ident == "new" {
return Err(syn::Error::new(
rpc.ident.span(),
format!("method name conflicts with generated fn `{}Client::new`", ident)
))
}
if rpc.ident == "serve" {
return Err(syn::Error::new(
rpc.ident.span(),
format!("method name conflicts with generated fn `{}::serve`", ident)
))
}
}
Ok(Service {
attrs,
vis,
@@ -85,19 +97,19 @@ impl Parse for RpcMethod {
FnArg::SelfRef(self_ref) => {
return Err(syn::Error::new(
self_ref.span(),
"RPC args cannot start with self",
"method args cannot start with self",
))
}
FnArg::SelfValue(self_val) => {
return Err(syn::Error::new(
self_val.span(),
"RPC args cannot start with self",
"method args cannot start with self",
))
}
arg => {
return Err(syn::Error::new(
arg.span(),
"RPC args must be explicitly typed patterns",
"method args must be explicitly typed patterns",
))
}
})
@@ -114,6 +126,13 @@ impl Parse for RpcMethod {
}
}
/// Generates:
/// - service trait
/// - serve fn
/// - client stub struct
/// - new_stub client factory fn
/// - Request and Response enums
/// - ResponseFut Future
#[proc_macro_attribute]
pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
struct EmptyArgs;
@@ -130,10 +149,11 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
ident,
rpcs,
} = parse_macro_input!(input as Service);
let vis_repeated = std::iter::repeat(vis.clone());
let camel_case_fn_names: Vec<String> = rpcs
.iter()
.map(|rpc| convert_str(&rpc.ident.to_string()))
.map(|rpc| snake_to_camel(&rpc.ident.to_string()))
.collect();
let ref outputs: Vec<TokenStream2> = rpcs
.iter()
@@ -191,6 +211,17 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
let service_name_repeated2 = service_name_repeated.clone();
let client_ident = Ident::new(&format!("{}Client", ident), ident.span());
let request_ident = Ident::new(&format!("{}Request", ident), ident.span());
let request_ident_repeated = std::iter::repeat(request_ident.clone());
let request_ident_repeated2 = request_ident_repeated.clone();
let response_ident = Ident::new(&format!("{}Response", ident), ident.span());
let response_ident_repeated = std::iter::repeat(response_ident.clone());
let response_ident_repeated2 = response_ident_repeated.clone();
let response_fut_name = format!("{}ResponseFut", ident);
let response_fut_ident = Ident::new(&response_fut_name, ident.span());
let response_fut_ident_repeated = std::iter::repeat(response_fut_ident.clone());
let response_fut_ident_repeated2 = response_fut_ident_repeated.clone();
let server_ident = Ident::new(&format!("Serve{}", ident), ident.span());
#[cfg(feature = "serde1")]
let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]);
@@ -201,104 +232,119 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
#( #attrs )*
#vis trait #ident: Clone + Send + 'static {
#( #types_and_fns )*
/// Returns a serving function to use with tarpc::server::Server.
fn serve(self) -> #server_ident<Self> {
#server_ident { service: self }
}
}
#[derive(Clone)]
#vis struct #server_ident<S> {
service: S,
}
impl<S> tarpc::server::Serve<#request_ident> for #server_ident<S>
where S: #ident
{
type Resp = #response_ident;
type Fut = #response_fut_ident<S>;
fn serve(self, ctx: tarpc::context::Context, req: #request_ident) -> Self::Fut {
match req {
#(
#request_ident_repeated::#camel_case_idents{ #arg_vars } => {
#response_fut_ident_repeated2::#camel_case_idents2(
#service_name_repeated2::#method_names(
self.service, ctx, #arg_vars2))
}
)*
}
}
}
/// The request sent over the wire from the client to the server.
#[derive(Debug)]
#derive_serialize
#vis enum Request {
#vis enum #request_ident {
#( #camel_case_idents{ #args } ),*
}
/// The response sent over the wire from the server to the client.
#[derive(Debug)]
#derive_serialize
#vis enum Response {
#vis enum #response_ident {
#( #camel_case_idents(#outputs) ),*
}
/// A future resolving to a server [`Response`].
#vis enum ResponseFut<S: #ident> {
/// A future resolving to a server response.
#vis enum #response_fut_ident<S: #ident> {
#( #camel_case_idents(<S as #service_name_repeated>::#future_types) ),*
}
impl<S: #ident> std::fmt::Debug for ResponseFut<S> {
impl<S: #ident> std::fmt::Debug for #response_fut_ident<S> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("ResponseFut").finish()
fmt.debug_struct(#response_fut_name).finish()
}
}
impl<S: #ident> std::future::Future for ResponseFut<S> {
type Output = std::io::Result<Response>;
impl<S: #ident> std::future::Future for #response_fut_ident<S> {
type Output = #response_ident;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>)
-> std::task::Poll<std::io::Result<Response>>
-> std::task::Poll<#response_ident>
{
unsafe {
match std::pin::Pin::get_unchecked_mut(self) {
#(
ResponseFut::#camel_case_idents(resp) =>
#response_fut_ident_repeated::#camel_case_idents(resp) =>
std::pin::Pin::new_unchecked(resp)
.poll(cx)
.map(Response::#camel_case_idents2)
.map(Ok),
.map(#response_ident_repeated::#camel_case_idents2),
)*
}
}
}
}
/// Returns a serving function to use with tarpc::server::Server.
#vis fn serve<S: #ident>(service: S)
-> impl FnOnce(tarpc::context::Context, Request) -> ResponseFut<S> + Send + 'static + Clone {
move |ctx, req| {
match req {
#(
Request::#camel_case_idents{ #arg_vars } => {
ResponseFut::#camel_case_idents2(
#service_name_repeated2::#method_names(
service.clone(), ctx, #arg_vars2))
}
)*
}
}
}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
#vis struct #client_ident<C = tarpc::client::Channel<Request, Response>>(C);
/// Returns a new client stub that sends requests over the given transport.
#vis async fn new_stub<T>(config: tarpc::client::Config, transport: T)
-> std::io::Result<#client_ident>
where
T: tarpc::Transport<tarpc::ClientMessage<Request>, tarpc::Response<Response>> + Send + 'static,
{
Ok(#client_ident(tarpc::client::new(config, transport).await?))
}
#vis struct #client_ident<C = tarpc::client::Channel<#request_ident, #response_ident>>(C);
impl<C> From<C> for #client_ident<C>
where for <'a> C: tarpc::Client<'a, Request, Response = Response>
where for <'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident>
{
fn from(client: C) -> Self {
#client_ident(client)
}
}
impl #client_ident {
/// Returns a new client stub that sends requests over the given transport.
#vis async fn new<T>(config: tarpc::client::Config, transport: T)
-> std::io::Result<Self>
where
T: tarpc::Transport<tarpc::ClientMessage<#request_ident>, tarpc::Response<#response_ident>> + Send + 'static
{
Ok(#client_ident(tarpc::client::new(config, transport).await?))
}
}
impl<C> #client_ident<C>
where for<'a> C: tarpc::Client<'a, Request, Response = Response>
where for<'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident>
{
#(
#[allow(unused)]
#( #method_attrs )*
pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args)
#vis_repeated fn #method_names(&mut self, ctx: tarpc::context::Context, #args)
-> impl std::future::Future<Output = std::io::Result<#outputs>> + '_ {
let request = Request::#camel_case_idents { #arg_vars };
let request = #request_ident_repeated2::#camel_case_idents { #arg_vars };
let resp = tarpc::Client::call(&mut self.0, ctx, request);
async move {
match resp.await? {
Response::#camel_case_idents2(msg) => std::result::Result::Ok(msg),
#response_ident_repeated2::#camel_case_idents2(msg) => std::result::Result::Ok(msg),
_ => unreachable!(),
}
}
@@ -310,33 +356,46 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
tokens.into()
}
fn convert_str(ident_str: &str) -> String {
fn snake_to_camel(ident_str: &str) -> String {
let mut camel_ty = String::new();
// Find the first non-underscore and add it capitalized.
let mut chars = ident_str.chars();
// Find the first non-underscore char, uppercase it, and append it.
// Guaranteed to succeed because all idents must have at least one non-underscore char.
camel_ty.extend(chars.find(|&c| c != '_').unwrap().to_uppercase());
// When we find an underscore, we remove it and capitalize the next char. To do this,
// we need to ensure the next char is not another underscore.
let mut chars = chars.coalesce(|c1, c2| {
if c1 == '_' && c2 == '_' {
Ok(c1)
} else {
Err((c1, c2))
}
});
let mut last_char_was_underscore = true;
while let Some(c) = chars.next() {
if c != '_' {
camel_ty.push(c);
} else if let Some(c) = chars.next() {
camel_ty.extend(c.to_uppercase());
match c {
'_' => last_char_was_underscore = true,
c if last_char_was_underscore => {
camel_ty.extend(c.to_uppercase());
last_char_was_underscore = false;
}
c => camel_ty.extend(c.to_lowercase()),
}
}
camel_ty
}
#[test]
fn snake_to_camel_basic() {
assert_eq!(snake_to_camel("abc_def"), "AbcDef");
}
#[test]
fn snake_to_camel_underscore_suffix() {
assert_eq!(snake_to_camel("abc_def_"), "AbcDef");
}
#[test]
fn snake_to_camel_underscore_prefix() {
assert_eq!(snake_to_camel("_abc_def"), "AbcDef");
}
#[test]
fn snake_to_camel_underscore_consecutive() {
assert_eq!(snake_to_camel("abc__def"), "AbcDef");
}
#[test]
fn snake_to_camel_capital_in_middle() {
assert_eq!(snake_to_camel("aBc_dEf"), "AbcDef");
}

View File

@@ -6,7 +6,7 @@ use tarpc::context;
fn att_service_trait() {
use futures::future::{ready, Ready};
#[tarpc_plugins::service]
#[tarpc::service]
trait Foo {
async fn two_part(s: String, i: i32) -> (String, i32);
async fn bar(s: String) -> String;
@@ -33,7 +33,7 @@ fn att_service_trait() {
#[test]
fn syntax() {
#[tarpc_plugins::service]
#[tarpc::service]
trait Syntax {
#[deny(warnings)]
#[allow(non_snake_case)]

View File

@@ -180,7 +180,15 @@ impl<Resp> Future for DispatchResponse<Resp> {
Poll::Ready(match resp {
Ok(resp) => {
self.complete = true;
Ok(resp.message?)
match resp {
Ok(resp) => Ok(resp.message?),
Err(oneshot::Canceled) => {
// The oneshot is Canceled when the dispatch task ends. In that case,
// there's nothing listening on the other side, so there's no point in
// propagating cancellation.
Err(io::Error::from(io::ErrorKind::ConnectionReset))
}
}
}
Err(e) => Err({
let trace_id = *self.as_mut().ctx().trace_id();
@@ -211,7 +219,7 @@ impl<Resp> Future for DispatchResponse<Resp> {
self.complete = true;
io::Error::from(io::ErrorKind::ConnectionReset)
} else {
panic!("[{}] Unrecognized deadline error: {}", trace_id, e)
panic!("[{}] Unrecognized deadline error: {:?}", trace_id, e)
}
}),
})

View File

@@ -23,7 +23,6 @@ use humantime::format_rfc3339;
use log::{debug, error, info, trace, warn};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
error::Error as StdError,
fmt,
hash::Hash,
io,
@@ -113,29 +112,29 @@ impl<Req, Resp> Server<Req, Resp> {
/// The future driving the server.
#[derive(Debug)]
pub struct Running<S, F> {
incoming: S,
request_handler: F,
pub struct Running<St, Se> {
incoming: St,
server: Se,
}
impl<S, F> Running<S, F> {
unsafe_pinned!(incoming: S);
unsafe_unpinned!(request_handler: F);
impl<St, Se> Running<St, Se> {
unsafe_pinned!(incoming: St);
unsafe_unpinned!(server: Se);
}
impl<S, C, F, Fut> Future for Running<S, F>
impl<St, C, Se> Future for Running<St, Se>
where
S: Sized + Stream<Item = C>,
St: Sized + Stream<Item = C>,
C: Channel + Send + 'static,
F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<C::Resp>> + Send + 'static,
Se: Serve<C::Req, Resp = C::Resp> + Send + 'static,
Se::Fut: Send + 'static
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
if let Err(e) =
crate::spawn(channel.respond_with(self.as_mut().request_handler().clone()))
crate::spawn(channel.respond_with(self.as_mut().server().clone()))
{
warn!("Failed to spawn channel handler: {:?}", e);
}
@@ -145,6 +144,30 @@ where
}
}
/// Basically a Fn(Req) -> impl Future<Output = Resp>;
pub trait Serve<Req>: Sized + Clone {
/// Type of response.
type Resp;
/// Type of response future.
type Fut: Future<Output = Self::Resp>;
/// Responds to a single request.
fn serve(self, ctx: context::Context, req: Req) -> Self::Fut;
}
impl<Req, Resp, Fut, F> Serve<Req> for F
where F: FnOnce(context::Context, Req) -> Fut + Clone,
Fut: Future<Output = Resp>
{
type Resp = Resp;
type Fut = Fut;
fn serve(self, ctx: context::Context, req: Req) -> Self::Fut {
self(ctx, req)
}
}
/// A utility trait enabling a stream to fluently chain a request handler.
pub trait Handler<C>
where
@@ -165,15 +188,15 @@ where
ThrottlerStream::new(self, n)
}
/// Responds to all requests with `request_handler`.
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
/// Responds to all requests with `server`.
fn respond_with<S>(self, server: S) -> Running<Self, S>
where
F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<C::Resp>> + Send + 'static,
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
S::Fut: Send + 'static,
{
Running {
incoming: self,
request_handler,
server,
}
}
}
@@ -291,10 +314,10 @@ where
/// Respond to requests coming over the channel with `f`. Returns a future that drives the
/// responses and resolves when the connection is closed.
fn respond_with<F, Fut>(self, f: F) -> ResponseHandler<Self, F>
fn respond_with<S>(self, server: S) -> ResponseHandler<Self, S>
where
F: FnOnce(context::Context, Self::Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Self::Resp>> + Send + 'static,
S: Serve<Self::Req, Resp = Self::Resp> + Send + 'static,
S::Fut: Send + 'static,
Self: Sized,
{
let (responses_tx, responses) = mpsc::channel(self.config().pending_response_buffer);
@@ -302,7 +325,7 @@ where
ResponseHandler {
channel: self,
f,
server,
pending_responses: responses,
responses_tx,
}
@@ -406,7 +429,7 @@ where
/// A running handler serving all requests coming over a channel.
#[derive(Debug)]
pub struct ResponseHandler<C, F>
pub struct ResponseHandler<C, S>
where
C: Channel,
{
@@ -416,10 +439,10 @@ where
/// Handed out to request handlers to fan in responses.
responses_tx: mpsc::Sender<(context::Context, Response<C::Resp>)>,
/// Request handler.
f: F,
server: S,
}
impl<C, F> ResponseHandler<C, F>
impl<C, S> ResponseHandler<C, S>
where
C: Channel,
{
@@ -428,14 +451,14 @@ where
unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response<C::Resp>)>);
// For this to be safe, field f must be private, and code in this module must never
// construct PinMut<F>.
unsafe_unpinned!(f: F);
unsafe_unpinned!(server: S);
}
impl<C, F, Fut> ResponseHandler<C, F>
impl<C, S> ResponseHandler<C, S>
where
C: Channel,
F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<C::Resp>> + Send + 'static,
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
S::Fut: Send + 'static,
{
fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
match ready!(self.as_mut().channel().poll_next(cx)?) {
@@ -516,7 +539,7 @@ where
let mut response_tx = self.as_mut().responses_tx().clone();
let trace_id = *ctx.trace_id();
let response = self.as_mut().f().clone()(ctx, request);
let response = self.as_mut().server().clone().serve(ctx, request);
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
move |result| {
async move {
@@ -550,11 +573,11 @@ where
}
}
impl<C, F, Fut> Future for ResponseHandler<C, F>
impl<C, S> Future for ResponseHandler<C, S>
where
C: Channel,
F: FnOnce(context::Context, C::Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<C::Resp>> + Send + 'static,
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
S::Fut: Send + 'static,
{
type Output = ();
@@ -581,7 +604,7 @@ where
}
fn make_server_error(
e: timeout::Error<io::Error>,
e: timeout::Error<()>,
trace_id: TraceId,
deadline: SystemTime,
) -> ServerError {
@@ -601,26 +624,20 @@ fn make_server_error(
}
} else if e.is_timer() {
error!(
"[{}] Response failed because of an issue with a timer: {}",
"[{}] Response failed because of an issue with a timer: {:?}",
trace_id, e
);
ServerError {
kind: io::ErrorKind::Other,
detail: Some(format!("{}", e)),
}
} else if e.is_inner() {
let e = e.into_inner().unwrap();
ServerError {
kind: e.kind(),
detail: Some(e.description().into()),
detail: Some(format!("{:?}", e)),
}
} else {
error!("[{}] Unexpected response failure: {}", trace_id, e);
error!("[{}] Unexpected response failure: {:?}", trace_id, e);
ServerError {
kind: io::ErrorKind::Other,
detail: Some(format!("Server unexpectedly failed to respond: {}", e)),
detail: Some(format!("Server unexpectedly failed to respond: {:?}", e)),
}
}
}

View File

@@ -93,9 +93,9 @@ mod tests {
let (client_channel, server_channel) = transport::channel::unbounded();
crate::spawn(
Server::<String, u64>::default()
Server::default()
.incoming(stream::once(future::ready(server_channel)))
.respond_with(|_ctx, request| {
.respond_with(|_ctx, request: String| {
future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
@@ -108,8 +108,8 @@ mod tests {
let mut client = client::new(client::Config::default(), client_channel).await?;
let response1 = client.call(context::current(), "123".into()).await;
let response2 = client.call(context::current(), "abc".into()).await;
let response1 = client.call(context::current(), "123".into()).await?;
let response2 = client.call(context::current(), "abc".into()).await?;
trace!("response1: {:?}, response2: {:?}", response1, response2);

View File

@@ -46,16 +46,15 @@ impl<T> Deadline<T> {
}
impl<T> Future for Deadline<T>
where
T: TryFuture,
T: Future,
{
type Output = Result<T::Ok, timeout::Error<T::Error>>;
type Output = Result<T::Output, timeout::Error<()>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// First, try polling the future
match self.as_mut().future().try_poll(cx) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
match self.as_mut().future().poll(cx) {
Poll::Ready(v) => return Poll::Ready(Ok(v)),
Poll::Pending => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(timeout::Error::inner(e))),
}
let delay = self.delay().poll_unpin(cx);

View File

@@ -28,9 +28,9 @@ assert_matches = "1.0"
bincode = "1.0"
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
bytes = { version = "0.4", features = ["serde"] }
env_logger = "0.6"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
humantime = "1.0"
env_logger = "0.6"
runtime = "0.3.0-alpha.6"
runtime-tokio = "0.3.0-alpha.5"
tokio-tcp = "0.1"

View File

@@ -7,11 +7,11 @@
#![feature(async_await, existential_type)]
use futures::{
compat::Executor01CompatExt,
future::{self, Ready},
prelude::*,
Future,
};
use publisher::Publisher as _;
use rpc::{
client, context,
server::{self, Handler},
@@ -24,22 +24,20 @@ use std::{
thread,
time::Duration,
};
use subscriber::Subscriber as _;
pub mod subscriber {
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
pub trait Subscriber {
async fn receive(message: String);
}
}
pub mod publisher {
use std::net::SocketAddr;
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
pub trait Publisher {
async fn broadcast(message: String);
async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>;
async fn unsubscribe(id: u32);
@@ -51,7 +49,7 @@ struct Subscriber {
id: u32,
}
impl subscriber::Service for Subscriber {
impl subscriber::Subscriber for Subscriber {
type ReceiveFut = Ready<()>;
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
@@ -69,7 +67,7 @@ impl Subscriber {
server::new(config)
.incoming(incoming)
.take(1)
.respond_with(subscriber::serve(Subscriber { id })),
.respond_with(Subscriber { id }.serve()),
);
Ok(addr)
}
@@ -77,7 +75,7 @@ impl Subscriber {
#[derive(Clone, Debug)]
struct Publisher {
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
}
impl Publisher {
@@ -88,11 +86,14 @@ impl Publisher {
}
}
impl publisher::Service for Publisher {
impl publisher::Publisher for Publisher {
existential type BroadcastFut: Future<Output = ()>;
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
async fn broadcast(clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>, message: String) {
async fn broadcast(
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
message: String,
) {
let mut clients = clients.lock().unwrap().clone();
for client in clients.values_mut() {
// Ignore failing subscribers. In a real pubsub,
@@ -109,12 +110,13 @@ impl publisher::Service for Publisher {
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
async fn subscribe(
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
id: u32,
addr: SocketAddr,
) -> io::Result<()> {
let conn = bincode_transport::connect(&addr).await?;
let subscriber = subscriber::new_stub(client::Config::default(), conn).await?;
let subscriber =
subscriber::SubscriberClient::new(client::Config::default(), conn).await?;
eprintln!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
Ok(())
@@ -149,7 +151,7 @@ async fn main() -> io::Result<()> {
transport
.take(1)
.map(server::BaseChannel::with_defaults)
.respond_with(publisher::serve(Publisher::new())),
.respond_with(Publisher::new().serve()),
);
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
@@ -157,7 +159,8 @@ async fn main() -> io::Result<()> {
let publisher_conn = bincode_transport::connect(&publisher_addr);
let publisher_conn = publisher_conn.await?;
let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?;
let mut publisher =
publisher::PublisherClient::new(client::Config::default(), publisher_conn).await?;
if let Err(e) = publisher
.subscribe(context::current(), 0, subscriber1)

View File

@@ -16,20 +16,19 @@ use rpc::{
};
use std::io;
// This is the service definition. It looks a lot like a trait definition.
// It defines one RPC, hello, which takes one arg, name, and returns a String.
/// This is the service definition. It looks a lot like a trait definition.
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
#[tarpc::service]
pub trait Service {
pub trait World {
async fn hello(name: String) -> String;
}
// This is the type that implements the generated Service trait. It is the business logic
// and is used to start the server.
/// This is the type that implements the generated World trait. It is the business logic
/// and is used to start the server.
#[derive(Clone)]
struct HelloServer;
impl Service for HelloServer {
impl World for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
@@ -56,22 +55,21 @@ async fn main() -> io::Result<()> {
// BaseChannel is the most basic channel, simply wrapping a transport with no added
// functionality.
BaseChannel::with_defaults(client)
// serve is generated by the tarpc::service! macro. It takes as input any type implementing
// the generated Service trait.
.respond_with(serve(HelloServer))
// serve_world is generated by the tarpc::service attribute. It takes as input any type
// implementing the generated World trait.
.respond_with(HelloServer.serve())
.await;
};
let _ = runtime::spawn(server);
let transport = bincode_transport::connect(&addr).await?;
// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = new_stub(client::Config::default(), transport).await?;
// WorldClient is generated by the tarpc::service attribute. It has a constructor `new` that
// takes a config and any Transport as input.
let mut client = WorldClient::new(client::Config::default(), transport).await?;
// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;

View File

@@ -6,7 +6,7 @@
#![feature(existential_type, async_await)]
use crate::{add::Service as AddService, double::Service as DoubleService};
use crate::{add::Add as AddService, double::Double as DoubleService};
use futures::{
future::{self, Ready},
prelude::*,
@@ -18,20 +18,16 @@ use rpc::{
use std::io;
pub mod add {
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
pub trait Add {
/// Add two ints together.
async fn add(x: i32, y: i32) -> i32;
}
}
pub mod double {
pub use ServiceClient as Client;
#[tarpc::service]
pub trait Service {
pub trait Double {
/// 2 * x
async fn double(x: i32) -> Result<i32, String>;
}
@@ -50,14 +46,14 @@ impl AddService for AddServer {
#[derive(Clone)]
struct DoubleServer {
add_client: add::Client,
add_client: add::AddClient,
}
impl DoubleService for DoubleServer {
existential type DoubleFut: Future<Output = Result<i32, String>> + Send;
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
async fn double(mut client: add::AddClient, x: i32) -> Result<i32, String> {
client
.add(context::current(), x, x)
.await
@@ -78,11 +74,11 @@ async fn main() -> io::Result<()> {
let add_server = Server::default()
.incoming(add_listener)
.take(1)
.respond_with(add::serve(AddServer));
.respond_with(AddServer.serve());
let _ = runtime::spawn(add_server);
let to_add_server = bincode_transport::connect(&addr).await?;
let add_client = add::new_stub(client::Config::default(), to_add_server).await?;
let add_client = add::AddClient::new(client::Config::default(), to_add_server).await?;
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?
.filter_map(|r| future::ready(r.ok()));
@@ -90,11 +86,12 @@ async fn main() -> io::Result<()> {
let double_server = rpc::Server::default()
.incoming(double_listener)
.take(1)
.respond_with(double::serve(DoubleServer { add_client }));
.respond_with(DoubleServer { add_client }.serve());
let _ = runtime::spawn(double_server);
let to_double_server = bincode_transport::connect(&addr).await?;
let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
let mut double_client =
double::DoubleClient::new(client::Config::default(), to_double_server).await?;
for i in 1..=5 {
eprintln!("{:?}", double_client.double(context::current(), i).await?);

View File

@@ -1,13 +1,20 @@
#![feature(async_await)]
#[cfg(not(feature = "serde1"))]
use std::rc::Rc;
use assert_matches::assert_matches;
use futures::{
future::{ready, Ready},
prelude::*,
};
#[cfg(feature = "serde1")]
use std::io;
use tarpc::{client, context, server::Handler, transport::channel};
use tarpc::{
client, context,
server::{self, BaseChannel, Channel, Handler},
transport::channel,
};
#[tarpc_plugins::service]
trait Service {
@@ -33,25 +40,24 @@ impl Service for Server {
}
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn sequential() {
async fn sequential() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
let _ = runtime::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server)),
BaseChannel::new(server::Config::default(), rx)
.respond_with(Server.serve())
);
let mut client = new_stub(client::Config::default(), tx).await.unwrap();
assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap());
assert_eq!(
"Hey, Tim.",
client
.hey(context::current(), "Tim".to_string())
.await
.unwrap()
);
let mut client = ServiceClient::new(client::Config::default(), tx).await?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
assert_matches!(
client.hey(context::current(), "Tim".into()).await,
Ok(ref s) if s == "Hey, Tim.");
Ok(())
}
#[cfg(feature = "serde1")]
@@ -64,11 +70,11 @@ async fn serde() -> io::Result<()> {
let _ = runtime::spawn(
tarpc::Server::default()
.incoming(transport.take(1).filter_map(|r| async { r.ok() }))
.respond_with(serve(Server)),
.respond_with(Server.serve()),
);
let transport = bincode_transport::connect(&addr).await?;
let mut client = new_stub(client::Config::default(), transport).await?;
let mut client = ServiceClient::new(client::Config::default(), transport).await?;
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
assert_matches!(
@@ -80,25 +86,30 @@ async fn serde() -> io::Result<()> {
}
#[runtime::test(runtime_tokio::TokioCurrentThread)]
async fn concurrent() {
async fn concurrent() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
let _ = runtime::spawn(
rpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(serve(Server)),
.respond_with(Server.serve()),
);
let client = new_stub(client::Config::default(), tx).await.unwrap();
let client = ServiceClient::new(client::Config::default(), tx).await?;
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
assert_eq!(3, req1.await.unwrap());
assert_eq!(7, req2.await.unwrap());
assert_eq!("Hey, Tim.", req3.await.unwrap());
assert_matches!(req1.await, Ok(3));
assert_matches!(req2.await, Ok(7));
assert_matches!(req3.await, Ok(ref s) if s == "Hey, Tim.");
Ok(())
}