mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-01 00:51:25 +01:00
Add service name to generated items.
With this change, the service definitions don't need to be isolated in their own modules.
This commit is contained in:
41
README.md
41
README.md
@@ -36,12 +36,9 @@ Add to your `Cargo.toml` dependencies:
|
||||
tarpc = "0.18.0"
|
||||
```
|
||||
|
||||
The `service!` macro expands to a collection of items that form an rpc service.
|
||||
In the above example, the macro is called within the `hello_service` module.
|
||||
This module will contain a `Client` stub and `Service` trait. These generated
|
||||
types make it easy and ergonomic to write servers without dealing with
|
||||
serialization directly. Simply implement one of the generated traits, and you're
|
||||
off to the races!
|
||||
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||
These generated types make it easy and ergonomic to write servers with less boilerplate.
|
||||
Simply implement the generated service trait, and you're off to the races!
|
||||
|
||||
## Example
|
||||
|
||||
@@ -77,13 +74,13 @@ use std::io;
|
||||
// This is the service definition. It looks a lot like a trait definition.
|
||||
// It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
#[tarpc::service]
|
||||
trait Service {
|
||||
trait World {
|
||||
/// Returns a greeting for name.
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
```
|
||||
|
||||
This service definition generates a trait called `Service`. Next we need to
|
||||
This service definition generates a trait called `World`. Next we need to
|
||||
implement it for our Server struct.
|
||||
|
||||
```rust
|
||||
@@ -104,17 +101,17 @@ implement it for our Server struct.
|
||||
# // This is the service definition. It looks a lot like a trait definition.
|
||||
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
# #[tarpc::service]
|
||||
# trait Service {
|
||||
# trait World {
|
||||
# /// Returns a greeting for name.
|
||||
# async fn hello(name: String) -> String;
|
||||
# }
|
||||
#
|
||||
// This is the type that implements the generated Service trait. It is the business logic
|
||||
// This is the type that implements the generated World trait. It is the business logic
|
||||
// and is used to start the server.
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl Service for HelloServer {
|
||||
impl World for HelloServer {
|
||||
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
// an associated type representing the future output by the fn.
|
||||
|
||||
@@ -150,17 +147,17 @@ that uses bincode over TCP.
|
||||
# // This is the service definition. It looks a lot like a trait definition.
|
||||
# // It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
# #[tarpc::service]
|
||||
# trait Service {
|
||||
# trait World {
|
||||
# /// Returns a greeting for name.
|
||||
# async fn hello(name: String) -> String;
|
||||
# }
|
||||
#
|
||||
# // This is the type that implements the generated Service trait. It is the business logic
|
||||
# // This is the type that implements the generated World trait. It is the business logic
|
||||
# // and is used to start the server.
|
||||
# #[derive(Clone)]
|
||||
# struct HelloServer;
|
||||
#
|
||||
# impl Service for HelloServer {
|
||||
# impl World for HelloServer {
|
||||
# // Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
# // an associated type representing the future output by the fn.
|
||||
#
|
||||
@@ -179,19 +176,19 @@ async fn main() -> io::Result<()> {
|
||||
// incoming() takes a stream of transports such as would be returned by
|
||||
// TcpListener::incoming (but a stream instead of an iterator).
|
||||
.incoming(stream::once(future::ready(server_transport)))
|
||||
// serve is generated by the service! macro. It takes as input any type implementing
|
||||
// the generated Service trait.
|
||||
.respond_with(serve(HelloServer));
|
||||
// serve_world is generated by the macro. It takes as input any type implementing
|
||||
// the generated World trait.
|
||||
.respond_with(serve_world(HelloServer));
|
||||
|
||||
let _ = runtime::spawn(server);
|
||||
|
||||
// new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
// world_stub is generated by the macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = new_stub(client::Config::default(), client_transport).await?;
|
||||
// by the service attribute.
|
||||
let mut client = world_stub(client::Config::default(), client_transport).await?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
|
||||
// args as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
||||
|
||||
|
||||
@@ -44,13 +44,12 @@ async fn main() -> io::Result<()> {
|
||||
|
||||
let transport = json_transport::connect(&server_addr).await?;
|
||||
|
||||
// new_stub is generated by the service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = service::new_stub(client::Config::default(), transport).await?;
|
||||
// world_stub is generated by the service attribute. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the attribute.
|
||||
let mut client = service::world_stub(client::Config::default(), transport).await?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
|
||||
// args as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = client.hello(context::current(), name).await?;
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
/// This is the service definition. It looks a lot like a trait definition.
|
||||
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
#[tarpc::service]
|
||||
pub trait Service {
|
||||
pub trait World {
|
||||
/// Returns a greeting for name.
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
@@ -17,12 +17,12 @@ use tarpc::{
|
||||
server::{self, Channel, Handler},
|
||||
};
|
||||
|
||||
// This is the type that implements the generated Service trait. It is the business logic
|
||||
// This is the type that implements the generated World trait. It is the business logic
|
||||
// and is used to start the server.
|
||||
#[derive(Clone)]
|
||||
struct HelloServer(SocketAddr);
|
||||
|
||||
impl service::Service for HelloServer {
|
||||
impl service::World for HelloServer {
|
||||
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
// an associated type representing the future output by the fn.
|
||||
|
||||
@@ -70,11 +70,11 @@ async fn main() -> io::Result<()> {
|
||||
.map(server::BaseChannel::with_defaults)
|
||||
// Limit channels to 1 per IP.
|
||||
.max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip())
|
||||
// serve is generated by the service! macro. It takes as input any type implementing
|
||||
// the generated Service trait.
|
||||
// serve is generated by the service attribute. It takes as input any type implementing
|
||||
// the generated World trait.
|
||||
.map(|channel| {
|
||||
let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap());
|
||||
channel.respond_with(service::serve(server))
|
||||
channel.respond_with(service::serve_world(server))
|
||||
})
|
||||
// Max 10 channels.
|
||||
.buffer_unordered(10)
|
||||
|
||||
@@ -196,6 +196,19 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
let service_name_repeated2 = service_name_repeated.clone();
|
||||
|
||||
let client_ident = Ident::new(&format!("{}Client", ident), ident.span());
|
||||
let request_ident = Ident::new(&format!("{}Request", ident), ident.span());
|
||||
let request_ident_repeated = std::iter::repeat(request_ident.clone());
|
||||
let request_ident_repeated2 = request_ident_repeated.clone();
|
||||
let response_ident = Ident::new(&format!("{}Response", ident), ident.span());
|
||||
let response_ident_repeated = std::iter::repeat(response_ident.clone());
|
||||
let response_ident_repeated2 = response_ident_repeated.clone();
|
||||
let response_fut_name = format!("{}ResponseFut", ident);
|
||||
let response_fut_ident = Ident::new(&response_fut_name, ident.span());
|
||||
let response_fut_ident_repeated = std::iter::repeat(response_fut_ident.clone());
|
||||
let response_fut_ident_repeated2 = response_fut_ident_repeated.clone();
|
||||
let snake_ident = camel_to_snake(&ident.to_string());
|
||||
let serve_ident = Ident::new(&format!("serve_{}", snake_ident), ident.span());
|
||||
let stub_ident = Ident::new(&format!("{}_stub", snake_ident), ident.span());
|
||||
|
||||
#[cfg(feature = "serde1")]
|
||||
let derive_serialize = quote!(#[derive(serde::Serialize, serde::Deserialize)]);
|
||||
@@ -211,41 +224,41 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
/// The request sent over the wire from the client to the server.
|
||||
#[derive(Debug)]
|
||||
#derive_serialize
|
||||
#vis enum Request {
|
||||
#vis enum #request_ident {
|
||||
#( #camel_case_idents{ #args } ),*
|
||||
}
|
||||
|
||||
/// The response sent over the wire from the server to the client.
|
||||
#[derive(Debug)]
|
||||
#derive_serialize
|
||||
#vis enum Response {
|
||||
#vis enum #response_ident {
|
||||
#( #camel_case_idents(#outputs) ),*
|
||||
}
|
||||
|
||||
/// A future resolving to a server [`Response`].
|
||||
#vis enum ResponseFut<S: #ident> {
|
||||
/// A future resolving to a server response.
|
||||
#vis enum #response_fut_ident<S: #ident> {
|
||||
#( #camel_case_idents(<S as #service_name_repeated>::#future_types) ),*
|
||||
}
|
||||
|
||||
impl<S: #ident> std::fmt::Debug for ResponseFut<S> {
|
||||
impl<S: #ident> std::fmt::Debug for #response_fut_ident<S> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
fmt.debug_struct("ResponseFut").finish()
|
||||
fmt.debug_struct(#response_fut_name).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: #ident> std::future::Future for ResponseFut<S> {
|
||||
type Output = std::io::Result<Response>;
|
||||
impl<S: #ident> std::future::Future for #response_fut_ident<S> {
|
||||
type Output = std::io::Result<#response_ident>;
|
||||
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>)
|
||||
-> std::task::Poll<std::io::Result<Response>>
|
||||
-> std::task::Poll<std::io::Result<#response_ident>>
|
||||
{
|
||||
unsafe {
|
||||
match std::pin::Pin::get_unchecked_mut(self) {
|
||||
#(
|
||||
ResponseFut::#camel_case_idents(resp) =>
|
||||
#response_fut_ident_repeated::#camel_case_idents(resp) =>
|
||||
std::pin::Pin::new_unchecked(resp)
|
||||
.poll(cx)
|
||||
.map(Response::#camel_case_idents2)
|
||||
.map(#response_ident_repeated::#camel_case_idents2)
|
||||
.map(Ok),
|
||||
)*
|
||||
}
|
||||
@@ -254,13 +267,13 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
}
|
||||
|
||||
/// Returns a serving function to use with tarpc::server::Server.
|
||||
#vis fn serve<S: #ident>(service: S)
|
||||
-> impl FnOnce(tarpc::context::Context, Request) -> ResponseFut<S> + Send + 'static + Clone {
|
||||
#vis fn #serve_ident<S: #ident>(service: S)
|
||||
-> impl FnOnce(tarpc::context::Context, #request_ident) -> #response_fut_ident<S> + Send + 'static + Clone {
|
||||
move |ctx, req| {
|
||||
match req {
|
||||
#(
|
||||
Request::#camel_case_idents{ #arg_vars } => {
|
||||
ResponseFut::#camel_case_idents2(
|
||||
#request_ident_repeated::#camel_case_idents{ #arg_vars } => {
|
||||
#response_fut_ident_repeated2::#camel_case_idents2(
|
||||
#service_name_repeated2::#method_names(
|
||||
service.clone(), ctx, #arg_vars2))
|
||||
}
|
||||
@@ -272,19 +285,19 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
#[allow(unused)]
|
||||
#[derive(Clone, Debug)]
|
||||
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
|
||||
#vis struct #client_ident<C = tarpc::client::Channel<Request, Response>>(C);
|
||||
#vis struct #client_ident<C = tarpc::client::Channel<#request_ident, #response_ident>>(C);
|
||||
|
||||
/// Returns a new client stub that sends requests over the given transport.
|
||||
#vis async fn new_stub<T>(config: tarpc::client::Config, transport: T)
|
||||
#vis async fn #stub_ident<T>(config: tarpc::client::Config, transport: T)
|
||||
-> std::io::Result<#client_ident>
|
||||
where
|
||||
T: tarpc::Transport<tarpc::ClientMessage<Request>, tarpc::Response<Response>> + Send + 'static,
|
||||
T: tarpc::Transport<tarpc::ClientMessage<#request_ident>, tarpc::Response<#response_ident>> + Send + 'static,
|
||||
{
|
||||
Ok(#client_ident(tarpc::client::new(config, transport).await?))
|
||||
}
|
||||
|
||||
impl<C> From<C> for #client_ident<C>
|
||||
where for <'a> C: tarpc::Client<'a, Request, Response = Response>
|
||||
where for <'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident>
|
||||
{
|
||||
fn from(client: C) -> Self {
|
||||
#client_ident(client)
|
||||
@@ -292,18 +305,18 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
}
|
||||
|
||||
impl<C> #client_ident<C>
|
||||
where for<'a> C: tarpc::Client<'a, Request, Response = Response>
|
||||
where for<'a> C: tarpc::Client<'a, #request_ident, Response = #response_ident>
|
||||
{
|
||||
#(
|
||||
#[allow(unused)]
|
||||
#( #method_attrs )*
|
||||
pub fn #method_names(&mut self, ctx: tarpc::context::Context, #args)
|
||||
-> impl std::future::Future<Output = std::io::Result<#outputs>> + '_ {
|
||||
let request = Request::#camel_case_idents { #arg_vars };
|
||||
let request = #request_ident_repeated2::#camel_case_idents { #arg_vars };
|
||||
let resp = tarpc::Client::call(&mut self.0, ctx, request);
|
||||
async move {
|
||||
match resp.await? {
|
||||
Response::#camel_case_idents2(msg) => std::result::Result::Ok(msg),
|
||||
#response_ident_repeated2::#camel_case_idents2(msg) => std::result::Result::Ok(msg),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -334,6 +347,28 @@ fn snake_to_camel(ident_str: &str) -> String {
|
||||
camel_ty
|
||||
}
|
||||
|
||||
// Really basic camel to snake that assumes capitals are always the start of a new segment.
|
||||
fn camel_to_snake(ident_str: &str) -> String {
|
||||
let mut snake = String::new();
|
||||
let mut chars = ident_str.chars();
|
||||
if let Some(c) = chars.next() {
|
||||
snake.extend(c.to_lowercase());
|
||||
}
|
||||
|
||||
while let Some(c) = chars.next() {
|
||||
if c.is_uppercase() {
|
||||
// New word
|
||||
snake.push('_');
|
||||
snake.extend(c.to_lowercase());
|
||||
} else {
|
||||
// Same word
|
||||
snake.push(c)
|
||||
}
|
||||
}
|
||||
|
||||
snake
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snake_to_camel_basic() {
|
||||
assert_eq!(snake_to_camel("abc_def"), "AbcDef");
|
||||
@@ -358,3 +393,8 @@ fn snake_to_camel_underscore_consecutive() {
|
||||
fn snake_to_camel_capital_in_middle() {
|
||||
assert_eq!(snake_to_camel("aBc_dEf"), "AbcDef");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn camel_to_snake_basic() {
|
||||
assert_eq!(camel_to_snake("AbcDef"), "abc_def");
|
||||
}
|
||||
|
||||
@@ -28,9 +28,9 @@ assert_matches = "1.0"
|
||||
bincode = "1.0"
|
||||
bincode-transport = { package = "tarpc-bincode-transport", version = "0.7", path = "../bincode-transport" }
|
||||
bytes = { version = "0.4", features = ["serde"] }
|
||||
env_logger = "0.6"
|
||||
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
|
||||
humantime = "1.0"
|
||||
env_logger = "0.6"
|
||||
runtime = "0.3.0-alpha.6"
|
||||
runtime-tokio = "0.3.0-alpha.5"
|
||||
tokio-tcp = "0.1"
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#![feature(async_await, existential_type)]
|
||||
|
||||
use futures::{
|
||||
compat::Executor01CompatExt,
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
Future,
|
||||
@@ -26,20 +25,17 @@ use std::{
|
||||
};
|
||||
|
||||
pub mod subscriber {
|
||||
pub use ServiceClient as Client;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait Service {
|
||||
pub trait Subscriber {
|
||||
async fn receive(message: String);
|
||||
}
|
||||
}
|
||||
|
||||
pub mod publisher {
|
||||
use std::net::SocketAddr;
|
||||
pub use ServiceClient as Client;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait Service {
|
||||
pub trait Publisher {
|
||||
async fn broadcast(message: String);
|
||||
async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>;
|
||||
async fn unsubscribe(id: u32);
|
||||
@@ -51,7 +47,7 @@ struct Subscriber {
|
||||
id: u32,
|
||||
}
|
||||
|
||||
impl subscriber::Service for Subscriber {
|
||||
impl subscriber::Subscriber for Subscriber {
|
||||
type ReceiveFut = Ready<()>;
|
||||
|
||||
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
|
||||
@@ -69,7 +65,7 @@ impl Subscriber {
|
||||
server::new(config)
|
||||
.incoming(incoming)
|
||||
.take(1)
|
||||
.respond_with(subscriber::serve(Subscriber { id })),
|
||||
.respond_with(subscriber::serve_subscriber(Subscriber { id })),
|
||||
);
|
||||
Ok(addr)
|
||||
}
|
||||
@@ -77,7 +73,7 @@ impl Subscriber {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Publisher {
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
@@ -88,11 +84,14 @@ impl Publisher {
|
||||
}
|
||||
}
|
||||
|
||||
impl publisher::Service for Publisher {
|
||||
impl publisher::Publisher for Publisher {
|
||||
existential type BroadcastFut: Future<Output = ()>;
|
||||
|
||||
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
|
||||
async fn broadcast(clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>, message: String) {
|
||||
async fn broadcast(
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
|
||||
message: String,
|
||||
) {
|
||||
let mut clients = clients.lock().unwrap().clone();
|
||||
for client in clients.values_mut() {
|
||||
// Ignore failing subscribers. In a real pubsub,
|
||||
@@ -109,12 +108,13 @@ impl publisher::Service for Publisher {
|
||||
|
||||
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
|
||||
async fn subscribe(
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::Client>>>,
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
|
||||
id: u32,
|
||||
addr: SocketAddr,
|
||||
) -> io::Result<()> {
|
||||
let conn = bincode_transport::connect(&addr).await?;
|
||||
let subscriber = subscriber::new_stub(client::Config::default(), conn).await?;
|
||||
let subscriber =
|
||||
subscriber::subscriber_stub(client::Config::default(), conn).await?;
|
||||
eprintln!("Subscribing {}.", id);
|
||||
clients.lock().unwrap().insert(id, subscriber);
|
||||
Ok(())
|
||||
@@ -149,7 +149,7 @@ async fn main() -> io::Result<()> {
|
||||
transport
|
||||
.take(1)
|
||||
.map(server::BaseChannel::with_defaults)
|
||||
.respond_with(publisher::serve(Publisher::new())),
|
||||
.respond_with(publisher::serve_publisher(Publisher::new())),
|
||||
);
|
||||
|
||||
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
|
||||
@@ -157,7 +157,8 @@ async fn main() -> io::Result<()> {
|
||||
|
||||
let publisher_conn = bincode_transport::connect(&publisher_addr);
|
||||
let publisher_conn = publisher_conn.await?;
|
||||
let mut publisher = publisher::new_stub(client::Config::default(), publisher_conn).await?;
|
||||
let mut publisher =
|
||||
publisher::publisher_stub(client::Config::default(), publisher_conn).await?;
|
||||
|
||||
if let Err(e) = publisher
|
||||
.subscribe(context::current(), 0, subscriber1)
|
||||
|
||||
@@ -16,20 +16,19 @@ use rpc::{
|
||||
};
|
||||
use std::io;
|
||||
|
||||
// This is the service definition. It looks a lot like a trait definition.
|
||||
// It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
|
||||
/// This is the service definition. It looks a lot like a trait definition.
|
||||
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
|
||||
#[tarpc::service]
|
||||
pub trait Service {
|
||||
pub trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
// This is the type that implements the generated Service trait. It is the business logic
|
||||
// and is used to start the server.
|
||||
/// This is the type that implements the generated World trait. It is the business logic
|
||||
/// and is used to start the server.
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
impl Service for HelloServer {
|
||||
impl World for HelloServer {
|
||||
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
// an associated type representing the future output by the fn.
|
||||
|
||||
@@ -56,22 +55,21 @@ async fn main() -> io::Result<()> {
|
||||
// BaseChannel is the most basic channel, simply wrapping a transport with no added
|
||||
// functionality.
|
||||
BaseChannel::with_defaults(client)
|
||||
// serve is generated by the tarpc::service! macro. It takes as input any type implementing
|
||||
// the generated Service trait.
|
||||
.respond_with(serve(HelloServer))
|
||||
// serve_world is generated by the tarpc::service attribute. It takes as input any type
|
||||
// implementing the generated World trait.
|
||||
.respond_with(serve_world(HelloServer))
|
||||
.await;
|
||||
};
|
||||
let _ = runtime::spawn(server);
|
||||
|
||||
let transport = bincode_transport::connect(&addr).await?;
|
||||
|
||||
// new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any
|
||||
// Transport as input, and returns a Client, also generated by the macro.
|
||||
// by the service mcro.
|
||||
let mut client = new_stub(client::Config::default(), transport).await?;
|
||||
// world_stub is generated by the tarpc::service attribute. Like Server, it takes a config and
|
||||
// any Transport as input, and returns a Client, also generated by the attribute.
|
||||
let mut client = world_stub(client::Config::default(), transport).await?;
|
||||
|
||||
// The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args
|
||||
// as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
|
||||
// args as defined, with the addition of a Context, which is always the first arg. The Context
|
||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
#![feature(existential_type, async_await)]
|
||||
|
||||
use crate::{add::Service as AddService, double::Service as DoubleService};
|
||||
use crate::{add::Add as AddService, double::Double as DoubleService};
|
||||
use futures::{
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
@@ -18,20 +18,16 @@ use rpc::{
|
||||
use std::io;
|
||||
|
||||
pub mod add {
|
||||
pub use ServiceClient as Client;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait Service {
|
||||
pub trait Add {
|
||||
/// Add two ints together.
|
||||
async fn add(x: i32, y: i32) -> i32;
|
||||
}
|
||||
}
|
||||
|
||||
pub mod double {
|
||||
pub use ServiceClient as Client;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait Service {
|
||||
pub trait Double {
|
||||
/// 2 * x
|
||||
async fn double(x: i32) -> Result<i32, String>;
|
||||
}
|
||||
@@ -50,14 +46,14 @@ impl AddService for AddServer {
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DoubleServer {
|
||||
add_client: add::Client,
|
||||
add_client: add::AddClient,
|
||||
}
|
||||
|
||||
impl DoubleService for DoubleServer {
|
||||
existential type DoubleFut: Future<Output = Result<i32, String>> + Send;
|
||||
|
||||
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
|
||||
async fn double(mut client: add::Client, x: i32) -> Result<i32, String> {
|
||||
async fn double(mut client: add::AddClient, x: i32) -> Result<i32, String> {
|
||||
client
|
||||
.add(context::current(), x, x)
|
||||
.await
|
||||
@@ -78,11 +74,11 @@ async fn main() -> io::Result<()> {
|
||||
let add_server = Server::default()
|
||||
.incoming(add_listener)
|
||||
.take(1)
|
||||
.respond_with(add::serve(AddServer));
|
||||
.respond_with(add::serve_add(AddServer));
|
||||
let _ = runtime::spawn(add_server);
|
||||
|
||||
let to_add_server = bincode_transport::connect(&addr).await?;
|
||||
let add_client = add::new_stub(client::Config::default(), to_add_server).await?;
|
||||
let add_client = add::add_stub(client::Config::default(), to_add_server).await?;
|
||||
|
||||
let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?
|
||||
.filter_map(|r| future::ready(r.ok()));
|
||||
@@ -90,11 +86,12 @@ async fn main() -> io::Result<()> {
|
||||
let double_server = rpc::Server::default()
|
||||
.incoming(double_listener)
|
||||
.take(1)
|
||||
.respond_with(double::serve(DoubleServer { add_client }));
|
||||
.respond_with(double::serve_double(DoubleServer { add_client }));
|
||||
let _ = runtime::spawn(double_server);
|
||||
|
||||
let to_double_server = bincode_transport::connect(&addr).await?;
|
||||
let mut double_client = double::new_stub(client::Config::default(), to_double_server).await?;
|
||||
let mut double_client =
|
||||
double::double_stub(client::Config::default(), to_double_server).await?;
|
||||
|
||||
for i in 1..=5 {
|
||||
eprintln!("{:?}", double_client.double(context::current(), i).await?);
|
||||
|
||||
@@ -1,13 +1,20 @@
|
||||
#![feature(async_await)]
|
||||
|
||||
#[cfg(not(feature = "serde1"))]
|
||||
use std::rc::Rc;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{
|
||||
future::{ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
#[cfg(feature = "serde1")]
|
||||
|
||||
use std::io;
|
||||
use tarpc::{client, context, server::Handler, transport::channel};
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{self, BaseChannel, Channel, Handler},
|
||||
transport::channel,
|
||||
};
|
||||
|
||||
#[tarpc_plugins::service]
|
||||
trait Service {
|
||||
@@ -33,25 +40,24 @@ impl Service for Server {
|
||||
}
|
||||
|
||||
#[runtime::test(runtime_tokio::TokioCurrentThread)]
|
||||
async fn sequential() {
|
||||
async fn sequential() -> io::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (tx, rx) = channel::unbounded();
|
||||
|
||||
let _ = runtime::spawn(
|
||||
tarpc::Server::default()
|
||||
.incoming(stream::once(ready(rx)))
|
||||
.respond_with(serve(Server)),
|
||||
BaseChannel::new(server::Config::default(), rx)
|
||||
.respond_with(serve_service(Server))
|
||||
);
|
||||
|
||||
let mut client = new_stub(client::Config::default(), tx).await.unwrap();
|
||||
assert_eq!(3, client.add(context::current(), 1, 2).await.unwrap());
|
||||
assert_eq!(
|
||||
"Hey, Tim.",
|
||||
client
|
||||
.hey(context::current(), "Tim".to_string())
|
||||
.await
|
||||
.unwrap()
|
||||
);
|
||||
let mut client = service_stub(client::Config::default(), tx).await?;
|
||||
|
||||
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
|
||||
assert_matches!(
|
||||
client.hey(context::current(), "Tim".into()).await,
|
||||
Ok(ref s) if s == "Hey, Tim.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde1")]
|
||||
@@ -64,11 +70,11 @@ async fn serde() -> io::Result<()> {
|
||||
let _ = runtime::spawn(
|
||||
tarpc::Server::default()
|
||||
.incoming(transport.take(1).filter_map(|r| async { r.ok() }))
|
||||
.respond_with(serve(Server)),
|
||||
.respond_with(serve_service(Server)),
|
||||
);
|
||||
|
||||
let transport = bincode_transport::connect(&addr).await?;
|
||||
let mut client = new_stub(client::Config::default(), transport).await?;
|
||||
let mut client = service_stub(client::Config::default(), transport).await?;
|
||||
|
||||
assert_matches!(client.add(context::current(), 1, 2).await, Ok(3));
|
||||
assert_matches!(
|
||||
@@ -80,25 +86,30 @@ async fn serde() -> io::Result<()> {
|
||||
}
|
||||
|
||||
#[runtime::test(runtime_tokio::TokioCurrentThread)]
|
||||
async fn concurrent() {
|
||||
async fn concurrent() -> io::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (tx, rx) = channel::unbounded();
|
||||
let _ = runtime::spawn(
|
||||
rpc::Server::default()
|
||||
.incoming(stream::once(ready(rx)))
|
||||
.respond_with(serve(Server)),
|
||||
.respond_with(serve_service(Server)),
|
||||
);
|
||||
|
||||
let client = new_stub(client::Config::default(), tx).await.unwrap();
|
||||
let client = service_stub(client::Config::default(), tx).await?;
|
||||
|
||||
let mut c = client.clone();
|
||||
let req1 = c.add(context::current(), 1, 2);
|
||||
|
||||
let mut c = client.clone();
|
||||
let req2 = c.add(context::current(), 3, 4);
|
||||
|
||||
let mut c = client.clone();
|
||||
let req3 = c.hey(context::current(), "Tim".to_string());
|
||||
|
||||
assert_eq!(3, req1.await.unwrap());
|
||||
assert_eq!(7, req2.await.unwrap());
|
||||
assert_eq!("Hey, Tim.", req3.await.unwrap());
|
||||
assert_matches!(req1.await, Ok(3));
|
||||
assert_matches!(req2.await, Ok(7));
|
||||
assert_matches!(req3.await, Ok(ref s) if s == "Hey, Tim.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user