21 Commits

Author SHA1 Message Date
Tim Kuehn
e3f34917c5 Prepare v0.21.1 2020-08-02 21:34:13 -07:00
Tim Kuehn
f65dd05949 Enable documentation for optional features on docs.rs 2020-08-02 20:57:21 -07:00
Tim Kuehn
240c436b34 Ensure Context is Sync. 2020-08-01 14:01:07 -07:00
Tim Kuehn
c9803688cc Ensure Context is Send. 2020-08-01 13:49:25 -07:00
Tim Kuehn
4987094483 Compression example.
Follow-up work: some extension points would be useful allow enabling compression on a per-request basis.

Fixes https://github.com/google/tarpc/issues/200
2020-08-01 13:45:16 -07:00
Tim Kuehn
ff55080193 Minor refactor 2020-07-30 13:11:13 -07:00
Tim Kuehn
258193c932 PubSub example needs to populate the subscription topics. 2020-07-30 11:14:13 -07:00
Tim Kuehn
67823ef5de Get rid of sleeps in PubSub example. 2020-07-30 01:27:31 -07:00
Tim Kuehn
a671457243 Add topics to PubSub example 2020-07-29 22:51:04 -07:00
Tim Kuehn
cf654549da Add documentation to PubSub example. 2020-07-29 18:05:35 -07:00
Tim Kuehn
6a01e32a2d Shut down client dispatch immediately when read half of transport is closed.
Clients can't receive any responses when the read half is closed, which means they can't verify if their requests were served. Therefore, there is no point in writing further requests after the read half is closed.
2020-07-29 13:50:42 -07:00
Tim Kuehn
e6597fab03 Add some error context to client dispatch.
I'm taking this opportunity to experiment with anyhow. So far, results are promising. It was a bit hard to use with Poll<Option<Result<T, E>>> types, so I added a crate-internal helper trait for that.
2020-07-29 12:07:07 -07:00
Tim Kuehn
ebd245a93d Rewrite pubsub example to have the subscriber connect to the publisher.
Fixes https://github.com/google/tarpc/issues/313
2020-07-28 22:10:17 -07:00
Tim Kuehn
3ebc3b5845 Add accessor fns.
- ClientHandler::get_pin_channel
- BaseChannel::get_pin_ref
- serde_transport::Transport::get_ref
2020-07-28 21:27:36 -07:00
Tim Kuehn
0e5973109d Make docs.rs document feature-gated public items. 2020-07-28 19:43:43 -07:00
Tim Kuehn
5f02d7383a Add tests for correct diagnostic output from proc macro-generated compiler errors. 2020-07-27 01:17:06 -07:00
Tim Kuehn
2bae148529 Address clippy lints 2020-07-27 00:04:45 -07:00
Tim Kuehn
42a2e03aab Add better diagnostics for missing 'async' in impls using #[tarpc::server] 2020-07-26 23:47:48 -07:00
Tim Kuehn
b566d0c646 Use #[tarpc::server] in example-service 2020-07-26 18:26:41 -07:00
Jon Cinque
b359f16767 Add concurrent tests using join and join_all
These tests are essentially copies of the `concurrent` test,
specifically using `join` and `join_all`.  Note that for the `join_all`
example to work, all of the `Client` clones must be created before *any*
requests are added, otherwise there will be a lifetime problem with the
second request, saying that second client, `c2`, is still borrowed when
`req1` is dropped.  It would require a larger redesign to fix this
issue.
2020-07-24 09:51:05 -07:00
Greg Fitzgerald
f8681ab134 Migrate examples to tarpc::server 2020-07-22 14:03:23 -07:00
29 changed files with 882 additions and 245 deletions

View File

@@ -109,14 +109,10 @@ implement it for our Server struct.
#[derive(Clone)]
struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
async fn hello(self, _: context::Context, name: String) -> String {
format!("Hello, {}!", name)
}
}
```

View File

@@ -1,3 +1,52 @@
## 0.21.1 (2020-08-02)
### New Features
#### #[tarpc::server] diagnostics
When a service impl uses #[tarpc::server], only `async fn`s are re-written. This can lead to
confusing compiler errors about missing associated types:
```
error: not all trait items implemented, missing: `HelloFut`
--> $DIR/tarpc_server_missing_async.rs:9:1
|
9 | impl World for HelloServer {
| ^^^^
```
The proc macro now provides better diagnostics for this case:
```
error: not all trait items implemented, missing: `HelloFut`
--> $DIR/tarpc_server_missing_async.rs:9:1
|
9 | impl World for HelloServer {
| ^^^^
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
--> $DIR/tarpc_server_missing_async.rs:10:5
|
10 | fn hello(name: String) -> String {
| ^^
```
### Bug Fixes
#### Fixed client hanging when server shuts down
Previously, clients would ignore when the read half of the transport was closed, continuing to
write requests. This didn't make much sense, because without the ability to receive responses,
clients have no way to know if requests were actually processed by the server. It basically just
led to clients that would hang for a few seconds before shutting down. This has now been
corrected: clients will immediately shut down when the read-half of the transport is closed.
#### More docs.rs documentation
Previously, docs.rs only documented items enabled by default, notably leaving out documentation
for tokio and serde features. This has now been corrected: docs.rs should have documentation
for all optional features.
## 0.21.0 (2020-06-26)
### New Features

View File

@@ -11,6 +11,8 @@ use tokio_serde::formats::Json;
#[tokio::main]
async fn main() -> io::Result<()> {
env_logger::init();
let flags = App::new("Hello Client")
.version("0.1")
.author("Tim <tikue@google.com>")

View File

@@ -5,10 +5,7 @@
// https://opensource.org/licenses/MIT.
use clap::{App, Arg};
use futures::{
future::{self, Ready},
prelude::*,
};
use futures::{future, prelude::*};
use service::World;
use std::{
io,
@@ -25,17 +22,10 @@ use tokio_serde::formats::Json;
#[derive(Clone)]
struct HelloServer(SocketAddr);
#[tarpc::server]
impl World for HelloServer {
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
// an associated type representing the future output by the fn.
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!(
"Hello, {}! You are connected from {:?}.",
name, self.0
))
async fn hello(self, _: context::Context, name: String) -> String {
format!("Hello, {}! You are connected from {:?}.", name, self.0)
}
}

View File

@@ -12,7 +12,7 @@ extern crate quote;
extern crate syn;
use proc_macro::TokenStream;
use proc_macro2::TokenStream as TokenStream2;
use proc_macro2::{Span, TokenStream as TokenStream2};
use quote::{format_ident, quote, ToTokens};
use syn::{
braced,
@@ -20,12 +20,24 @@ use syn::{
parenthesized,
parse::{Parse, ParseStream},
parse_macro_input, parse_quote, parse_str,
punctuated::Punctuated,
spanned::Spanned,
token::Comma,
Attribute, FnArg, Ident, ImplItem, ImplItemMethod, ImplItemType, ItemImpl, Lit, LitBool,
MetaNameValue, Pat, PatType, ReturnType, Token, Type, Visibility,
};
/// Accumulates multiple errors into a result.
/// Only use this for recoverable errors, i.e. non-parse errors. Fatal errors should early exit to
/// avoid further complications.
macro_rules! extend_errors {
($errors: ident, $e: expr) => {
match $errors {
Ok(_) => $errors = Err($e),
Err(ref mut errors) => errors.extend($e),
}
};
}
struct Service {
attrs: Vec<Attribute>,
vis: Visibility,
@@ -52,20 +64,31 @@ impl Parse for Service {
while !content.is_empty() {
rpcs.push(content.parse()?);
}
let mut ident_errors = Ok(());
for rpc in &rpcs {
if rpc.ident == "new" {
return Err(input.error(format!(
"method name conflicts with generated fn `{}Client::new`",
ident.unraw()
)));
extend_errors!(
ident_errors,
syn::Error::new(
rpc.ident.span(),
format!(
"method name conflicts with generated fn `{}Client::new`",
ident.unraw()
)
)
);
}
if rpc.ident == "serve" {
return Err(input.error(format!(
"method name conflicts with generated fn `{}::serve`",
ident
)));
extend_errors!(
ident_errors,
syn::Error::new(
rpc.ident.span(),
format!("method name conflicts with generated fn `{}::serve`", ident)
)
);
}
}
ident_errors?;
Ok(Self {
attrs,
@@ -84,17 +107,28 @@ impl Parse for RpcMethod {
let ident = input.parse()?;
let content;
parenthesized!(content in input);
let args: Punctuated<FnArg, Comma> = content.parse_terminated(FnArg::parse)?;
let args = args
.into_iter()
.map(|arg| match arg {
FnArg::Typed(captured) => match *captured.pat {
Pat::Ident(_) => Ok(captured),
_ => Err(input.error("patterns aren't allowed in RPC args")),
},
FnArg::Receiver(_) => Err(input.error("method args cannot start with self")),
})
.collect::<Result<_, _>>()?;
let mut args = Vec::new();
let mut errors = Ok(());
for arg in content.parse_terminated::<FnArg, Comma>(FnArg::parse)? {
match arg {
FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
args.push(captured);
}
FnArg::Typed(captured) => {
extend_errors!(
errors,
syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args")
);
}
FnArg::Receiver(_) => {
extend_errors!(
errors,
syn::Error::new(arg.span(), "method args cannot start with self")
);
}
}
}
errors?;
let output = input.parse()?;
input.parse::<Token![;]>()?;
@@ -113,32 +147,71 @@ struct DeriveSerde(bool);
impl Parse for DeriveSerde {
fn parse(input: ParseStream) -> syn::Result<Self> {
if input.is_empty() {
return Ok(Self(cfg!(feature = "serde1")));
}
match input.parse::<MetaNameValue>()? {
MetaNameValue {
ref path, ref lit, ..
} if path.segments.len() == 1
&& path.segments.first().unwrap().ident == "derive_serde" =>
{
match lit {
Lit::Bool(LitBool { value: true, .. }) if cfg!(feature = "serde1") => {
Ok(Self(true))
}
Lit::Bool(LitBool { value: true, .. }) => {
Err(input
.error("To enable serde, first enable the `serde1` feature of tarpc"))
}
Lit::Bool(LitBool { value: false, .. }) => Ok(Self(false)),
_ => Err(input.error("`derive_serde` expects a value of type `bool`")),
let mut result = Ok(None);
let mut derive_serde = Vec::new();
let meta_items = input.parse_terminated::<MetaNameValue, Comma>(MetaNameValue::parse)?;
for meta in meta_items {
if meta.path.segments.len() != 1 {
extend_errors!(
result,
syn::Error::new(
meta.span(),
"tarpc::service does not support this meta item"
)
);
continue;
}
let segment = meta.path.segments.first().unwrap();
if segment.ident != "derive_serde" {
extend_errors!(
result,
syn::Error::new(
meta.span(),
"tarpc::service does not support this meta item"
)
);
continue;
}
match meta.lit {
Lit::Bool(LitBool { value: true, .. }) if cfg!(feature = "serde1") => {
result = result.and(Ok(Some(true)))
}
Lit::Bool(LitBool { value: true, .. }) => {
extend_errors!(
result,
syn::Error::new(
meta.span(),
"To enable serde, first enable the `serde1` feature of tarpc"
)
);
}
Lit::Bool(LitBool { value: false, .. }) => result = result.and(Ok(Some(false))),
_ => extend_errors!(
result,
syn::Error::new(
meta.lit.span(),
"`derive_serde` expects a value of type `bool`"
)
),
}
_ => {
Err(input
.error("tarpc::service only supports one meta item, `derive_serde = {bool}`"))
derive_serde.push(meta);
}
if derive_serde.len() > 1 {
for (i, derive_serde) in derive_serde.iter().enumerate() {
extend_errors!(
result,
syn::Error::new(
derive_serde.span(),
format!(
"`derive_serde` appears more than once (occurrence #{})",
i + 1
)
)
);
}
}
let derive_serde = result?.unwrap_or(cfg!(feature = "serde1"));
Ok(Self(derive_serde))
}
}
@@ -212,6 +285,12 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
.into()
}
/// generate an identifier consisting of the method name to CamelCase with
/// Fut appended to it.
fn associated_type_for_rpc(method: &ImplItemMethod) -> String {
snake_to_camel(&method.sig.ident.unraw().to_string()) + "Fut"
}
/// Transforms an async function into a sync one, returning a type declaration
/// for the return type (a future).
fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
@@ -223,9 +302,7 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
ReturnType::Type(_, ret) => quote!(#ret),
};
// generate an identifier consisting of the method name to CamelCase with
// Fut appended to it.
let fut_name = snake_to_camel(&method.sig.ident.unraw().to_string()) + "Fut";
let fut_name = associated_type_for_rpc(method);
let fut_name_ident = Ident::new(&fut_name, method.sig.ident.span());
// generate the updated return signature.
@@ -308,22 +385,37 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
#[proc_macro_attribute]
pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
let mut item = syn::parse_macro_input!(input as ItemImpl);
let span = item.span();
// the generated type declarations
let mut types: Vec<ImplItemType> = Vec::new();
let mut expected_non_async_types: Vec<(&ImplItemMethod, String)> = Vec::new();
let mut found_non_async_types: Vec<&ImplItemType> = Vec::new();
for inner in &mut item.items {
if let ImplItem::Method(method) = inner {
let sig = &method.sig;
// if this function is declared async, transform it into a regular function
if sig.asyncness.is_some() {
let typedecl = transform_method(method);
types.push(typedecl);
match inner {
ImplItem::Method(method) => {
if method.sig.asyncness.is_some() {
// if this function is declared async, transform it into a regular function
let typedecl = transform_method(method);
types.push(typedecl);
} else {
// If it's not async, keep track of all required associated types for better
// error reporting.
expected_non_async_types.push((method, associated_type_for_rpc(method)));
}
}
ImplItem::Type(typedecl) => found_non_async_types.push(typedecl),
_ => {}
}
}
if let Err(e) =
verify_types_were_provided(span, &expected_non_async_types, &found_non_async_types)
{
return TokenStream::from(e.to_compile_error());
}
// add the type declarations into the impl block
for t in types.into_iter() {
item.items.push(syn::ImplItem::Type(t));
@@ -332,6 +424,39 @@ pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
TokenStream::from(quote!(#item))
}
fn verify_types_were_provided(
span: Span,
expected: &[(&ImplItemMethod, String)],
provided: &[&ImplItemType],
) -> syn::Result<()> {
let mut result = Ok(());
for (method, expected) in expected {
if provided
.iter()
.find(|typedecl| typedecl.ident == expected)
.is_none()
{
let mut e = syn::Error::new(
span,
format!("not all trait items implemented, missing: `{}`", expected),
);
let fn_span = method.sig.fn_token.span();
e.extend(syn::Error::new(
fn_span.join(method.sig.ident.span()).unwrap_or(fn_span),
format!(
"hint: `#[tarpc::server]` only rewrites async fns, and `fn {}` is not async",
method.sig.ident
),
));
match result {
Ok(_) => result = Err(e),
Err(ref mut error) => error.extend(Some(e)),
}
}
}
result
}
// Things needed to generate the service items: trait, serve impl, request/response enums, and
// the client stub.
struct ServiceGenerator<'a> {
@@ -398,7 +523,7 @@ impl<'a> ServiceGenerator<'a> {
#vis trait #service_ident: Clone {
#( #types_and_fns )*
/// Returns a serving function to use with tarpc::server::Server.
/// Returns a serving function to use with [tarpc::server::Channel::respond_with].
fn serve(self) -> #server_ident<Self> {
#server_ident { service: self }
}
@@ -412,6 +537,7 @@ impl<'a> ServiceGenerator<'a> {
} = self;
quote! {
/// A serving function to use with [tarpc::server::Channel::respond_with].
#[derive(Clone)]
#vis struct #server_ident<S> {
service: S,

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.21.0"
version = "0.21.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
license = "MIT"
@@ -26,6 +26,7 @@ full = ["serde1", "tokio1", "serde-transport", "tcp"]
travis-ci = { repository = "google/tarpc" }
[dependencies]
anyhow = "1.0"
fnv = "1.0"
futures = "0.3"
humantime = "1.0"
@@ -34,20 +35,29 @@ pin-project = "0.4.17"
rand = "0.7"
tokio = { version = "0.2", features = ["time"] }
serde = { optional = true, version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
tokio-util = { optional = true, version = "0.2" }
tarpc-plugins = { path = "../plugins", version = "0.8" }
tokio-serde = { optional = true, version = "0.6" }
[dev-dependencies]
assert_matches = "1.0"
bincode = "1.3"
bytes = { version = "0.5", features = ["serde"] }
env_logger = "0.6"
flate2 = "1.0.16"
futures = "0.3"
humantime = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha"
serde_bytes = "0.11"
tokio = { version = "0.2", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json"] }
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
trybuild = "1.0"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[[example]]
name = "server_calling_server"

View File

@@ -0,0 +1,130 @@
use flate2::{read::DeflateDecoder, write::DeflateEncoder, Compression};
use futures::{Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use std::{io, io::Read, io::Write};
use tarpc::{
client, context,
serde_transport::tcp,
server::{BaseChannel, Channel},
};
use tokio_serde::formats::Bincode;
/// Type of compression that should be enabled on the request. The transport is free to ignore this.
#[derive(Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)]
pub enum CompressionAlgorithm {
Deflate,
}
#[derive(Debug, Deserialize, Serialize)]
pub enum CompressedMessage<T> {
Uncompressed(T),
Compressed {
algorithm: CompressionAlgorithm,
payload: ByteBuf,
},
}
#[derive(Deserialize, Serialize)]
enum CompressionType {
Uncompressed,
Compressed,
}
async fn compress<T>(message: T) -> io::Result<CompressedMessage<T>>
where
T: Serialize,
{
let message = serialize(message)?;
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&message).unwrap();
let compressed = encoder.finish()?;
Ok(CompressedMessage::Compressed {
algorithm: CompressionAlgorithm::Deflate,
payload: ByteBuf::from(compressed),
})
}
async fn decompress<T>(message: CompressedMessage<T>) -> io::Result<T>
where
for<'a> T: Deserialize<'a>,
{
match message {
CompressedMessage::Compressed { algorithm, payload } => {
if algorithm != CompressionAlgorithm::Deflate {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Compression algorithm {:?} not supported", algorithm),
));
}
let mut deflater = DeflateDecoder::new(payload.as_slice());
let mut payload = ByteBuf::new();
deflater.read_to_end(&mut payload)?;
let message = deserialize(payload)?;
Ok(message)
}
CompressedMessage::Uncompressed(message) => Ok(message),
}
}
fn serialize<T: Serialize>(t: T) -> io::Result<ByteBuf> {
bincode::serialize(&t)
.map(ByteBuf::from)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn deserialize<D>(message: ByteBuf) -> io::Result<D>
where
for<'a> D: Deserialize<'a>,
{
bincode::deserialize(message.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn add_compression<In, Out>(
transport: impl Stream<Item = io::Result<CompressedMessage<In>>>
+ Sink<CompressedMessage<Out>, Error = io::Error>,
) -> impl Stream<Item = io::Result<In>> + Sink<Out, Error = io::Error>
where
Out: Serialize,
for<'a> In: Deserialize<'a>,
{
transport.with(compress).and_then(decompress)
}
#[tarpc::service]
pub trait World {
async fn hello(name: String) -> String;
}
#[derive(Clone, Debug)]
struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
format!("Hey, {}!", name)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut incoming = tcp::listen("localhost:0", Bincode::default).await?;
let addr = incoming.local_addr();
tokio::spawn(async move {
let transport = incoming.next().await.unwrap().unwrap();
BaseChannel::with_defaults(add_compression(transport))
.respond_with(HelloServer.serve())
.execute()
.await;
});
let transport = tcp::connect(addr, Bincode::default()).await?;
let mut client =
WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?;
println!(
"{}",
client.hello(context::current(), "friend".into()).await?
);
Ok(())
}

View File

@@ -4,192 +4,341 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
/// - The PubSub server sets up TCP listeners on 2 ports, the "subscriber" port and the "publisher"
/// port. Because both publishers and subscribers initiate their connections to the PubSub
/// server, the server requires no prior knowledge of either publishers or subscribers.
///
/// - Subscribers connect to the server on the server's "subscriber" port. Once a connection is
/// established, the server acts as the client of the Subscriber service, initially requesting
/// the topics the subscriber is interested in, and subsequently sending topical messages to the
/// subscriber.
///
/// - Publishers connect to the server on the "publisher" port and, once connected, they send
/// topical messages via Publisher service to the server. The server then broadcasts each
/// messages to all clients subscribed to the topic of that message.
///
/// Subscriber Publisher PubSub Server
/// T1 | | |
/// T2 |-----Connect------------------------------------------------------>|
/// T3 | | |
/// T2 |<-------------------------------------------------------Topics-----|
/// T2 |-----(OK) Topics-------------------------------------------------->|
/// T3 | | |
/// T4 | |-----Connect-------------------->|
/// T5 | | |
/// T6 | |-----Publish-------------------->|
/// T7 | | |
/// T8 |<------------------------------------------------------Receive-----|
/// T9 |-----(OK) Receive------------------------------------------------->|
/// T10 | | |
/// T11 | |<--------------(OK) Publish------|
use anyhow::anyhow;
use futures::{
future::{self, Ready},
channel::oneshot,
future::{self, AbortHandle},
prelude::*,
Future,
};
use log::info;
use publisher::Publisher as _;
use std::{
collections::HashMap,
io,
net::SocketAddr,
pin::Pin,
sync::{Arc, Mutex},
time::Duration,
sync::{Arc, Mutex, RwLock},
};
use subscriber::Subscriber as _;
use tarpc::{
client, context,
server::{self, Handler},
serde_transport::tcp,
server::{self, Channel},
};
use tokio::net::ToSocketAddrs;
use tokio_serde::formats::Json;
pub mod subscriber {
#[tarpc::service]
pub trait Subscriber {
async fn receive(message: String);
async fn topics() -> Vec<String>;
async fn receive(topic: String, message: String);
}
}
pub mod publisher {
use std::net::SocketAddr;
#[tarpc::service]
pub trait Publisher {
async fn broadcast(message: String);
async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>;
async fn unsubscribe(id: u32);
async fn publish(topic: String, message: String);
}
}
#[derive(Clone, Debug)]
struct Subscriber {
id: u32,
local_addr: SocketAddr,
topics: Vec<String>,
}
#[tarpc::server]
impl subscriber::Subscriber for Subscriber {
type ReceiveFut = Ready<()>;
async fn topics(self, _: context::Context) -> Vec<String> {
self.topics.clone()
}
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
eprintln!("{} received message: {}", self.id, message);
future::ready(())
async fn receive(self, _: context::Context, topic: String, message: String) {
info!(
"[{}] received message on topic '{}': {}",
self.local_addr, topic, message
);
}
}
struct SubscriberHandle(AbortHandle);
impl Drop for SubscriberHandle {
fn drop(&mut self) {
self.0.abort();
}
}
impl Subscriber {
async fn listen(id: u32, config: server::Config) -> io::Result<SocketAddr> {
let incoming = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
.await?
.filter_map(|r| future::ready(r.ok()));
let addr = incoming.get_ref().local_addr();
tokio::spawn(
server::new(config)
.incoming(incoming)
.take(1)
.respond_with(Subscriber { id }.serve()),
);
Ok(addr)
async fn connect(
publisher_addr: impl ToSocketAddrs,
topics: Vec<String>,
) -> anyhow::Result<SubscriberHandle> {
let publisher = tcp::connect(publisher_addr, Json::default()).await?;
let local_addr = publisher.local_addr()?;
let mut handler = server::BaseChannel::with_defaults(publisher)
.respond_with(Subscriber { local_addr, topics }.serve());
// The first request is for the topics being subscriibed to.
match handler.next().await {
Some(init_topics) => init_topics?.await,
None => {
return Err(anyhow!(
"[{}] Server never initialized the subscriber.",
local_addr
))
}
};
let (handler, abort_handle) = future::abortable(handler.execute());
tokio::spawn(async move {
match handler.await {
Ok(()) | Err(future::Aborted) => info!("[{}] subscriber shutdown.", local_addr),
}
});
Ok(SubscriberHandle(abort_handle))
}
}
#[derive(Debug)]
struct Subscription {
subscriber: subscriber::SubscriberClient,
topics: Vec<String>,
}
#[derive(Clone, Debug)]
struct Publisher {
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
clients: Arc<Mutex<HashMap<SocketAddr, Subscription>>>,
subscriptions: Arc<RwLock<HashMap<String, HashMap<SocketAddr, subscriber::SubscriberClient>>>>,
}
struct PublisherAddrs {
publisher: SocketAddr,
subscriptions: SocketAddr,
}
impl Publisher {
fn new() -> Publisher {
Publisher {
clients: Arc::new(Mutex::new(HashMap::new())),
async fn start(self) -> io::Result<PublisherAddrs> {
let mut connecting_publishers = tcp::listen("localhost:0", Json::default).await?;
let publisher_addrs = PublisherAddrs {
publisher: connecting_publishers.local_addr(),
subscriptions: self.clone().start_subscription_manager().await?,
};
info!("[{}] listening for publishers.", publisher_addrs.publisher);
tokio::spawn(async move {
// Because this is just an example, we know there will only be one publisher. In more
// realistic code, this would be a loop to continually accept new publisher
// connections.
let publisher = connecting_publishers.next().await.unwrap().unwrap();
info!("[{}] publisher connected.", publisher.peer_addr().unwrap());
server::BaseChannel::with_defaults(publisher)
.respond_with(self.serve())
.execute()
.await
});
Ok(publisher_addrs)
}
async fn start_subscription_manager(mut self) -> io::Result<SocketAddr> {
let mut connecting_subscribers = tcp::listen("localhost:0", Json::default)
.await?
.filter_map(|r| future::ready(r.ok()));
let new_subscriber_addr = connecting_subscribers.get_ref().local_addr();
info!("[{}] listening for subscribers.", new_subscriber_addr);
tokio::spawn(async move {
while let Some(conn) = connecting_subscribers.next().await {
let subscriber_addr = conn.peer_addr().unwrap();
let tarpc::client::NewClient {
client: subscriber,
dispatch,
} = subscriber::SubscriberClient::new(client::Config::default(), conn);
let (ready_tx, ready) = oneshot::channel();
self.clone()
.start_subscriber_gc(subscriber_addr, dispatch, ready);
// Populate the topics
self.initialize_subscription(subscriber_addr, subscriber)
.await;
// Signal that initialization is done.
ready_tx.send(()).unwrap();
}
});
Ok(new_subscriber_addr)
}
async fn initialize_subscription(
&mut self,
subscriber_addr: SocketAddr,
mut subscriber: subscriber::SubscriberClient,
) {
// Populate the topics
if let Ok(topics) = subscriber.topics(context::current()).await {
self.clients.lock().unwrap().insert(
subscriber_addr,
Subscription {
subscriber: subscriber.clone(),
topics: topics.clone(),
},
);
info!("[{}] subscribed to topics: {:?}", subscriber_addr, topics);
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in topics {
subscriptions
.entry(topic)
.or_insert_with(HashMap::new)
.insert(subscriber_addr, subscriber.clone());
}
}
}
fn start_subscriber_gc(
self,
subscriber_addr: SocketAddr,
client_dispatch: impl Future<Output = anyhow::Result<()>> + Send + 'static,
subscriber_ready: oneshot::Receiver<()>,
) {
tokio::spawn(async move {
if let Err(e) = client_dispatch.await {
info!(
"[{}] subscriber connection broken: {:?}",
subscriber_addr, e
)
}
// Don't clean up the subscriber until initialization is done.
let _ = subscriber_ready.await;
if let Some(subscription) = self.clients.lock().unwrap().remove(&subscriber_addr) {
info!(
"[{} unsubscribing from topics: {:?}",
subscriber_addr, subscription.topics
);
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in subscription.topics {
let subscribers = subscriptions.get_mut(&topic).unwrap();
subscribers.remove(&subscriber_addr);
if subscribers.is_empty() {
subscriptions.remove(&topic);
}
}
}
});
}
}
#[tarpc::server]
impl publisher::Publisher for Publisher {
type BroadcastFut = Pin<Box<dyn Future<Output = ()> + Send>>;
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
async fn broadcast(
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
message: String,
) {
let mut clients = clients.lock().unwrap().clone();
for client in clients.values_mut() {
// Ignore failing subscribers. In a real pubsub,
// you'd want to continually retry until subscribers
// ack.
let _ = client.receive(context::current(), message.clone()).await;
async fn publish(self, _: context::Context, topic: String, message: String) {
info!("received message to publish.");
let mut subscribers = match self.subscriptions.read().unwrap().get(&topic) {
None => return,
Some(subscriptions) => subscriptions.clone(),
};
let mut publications = Vec::new();
for client in subscribers.values_mut() {
publications.push(client.receive(context::current(), topic.clone(), message.clone()));
}
// Ignore failing subscribers. In a real pubsub, you'd want to continually retry until
// subscribers ack. Of course, a lot would be different in a real pubsub :)
for response in future::join_all(publications).await {
if let Err(e) = response {
info!("failed to broadcast to subscriber: {}", e);
}
}
broadcast(self.clients.clone(), message).boxed()
}
type SubscribeFut = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
async fn subscribe(
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
id: u32,
addr: SocketAddr,
) -> io::Result<()> {
let conn = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
let subscriber =
subscriber::SubscriberClient::new(client::Config::default(), conn).spawn()?;
eprintln!("Subscribing {}.", id);
clients.lock().unwrap().insert(id, subscriber);
Ok(())
}
subscribe(Arc::clone(&self.clients), id, addr)
.map_err(|e| e.to_string())
.boxed()
}
type UnsubscribeFut = Pin<Box<dyn Future<Output = ()> + Send>>;
fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
eprintln!("Unsubscribing {}", id);
let mut clients = self.clients.lock().unwrap();
if clients.remove(&id).is_none() {
eprintln!(
"Client {} not found. Existings clients: {:?}",
id, &*clients
);
}
future::ready(()).boxed()
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
async fn main() -> anyhow::Result<()> {
env_logger::init();
let transport = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
.await?
.filter_map(|r| future::ready(r.ok()));
let publisher_addr = transport.get_ref().local_addr();
tokio::spawn(
transport
.take(1)
.map(server::BaseChannel::with_defaults)
.respond_with(Publisher::new().serve()),
);
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
let subscriber2 = Subscriber::listen(1, server::Config::default()).await?;
let publisher_conn = tarpc::serde_transport::tcp::connect(publisher_addr, Json::default());
let publisher_conn = publisher_conn.await?;
let mut publisher =
publisher::PublisherClient::new(client::Config::default(), publisher_conn).spawn()?;
if let Err(e) = publisher
.subscribe(context::current(), 0, subscriber1)
.await?
{
eprintln!("Couldn't subscribe subscriber 0: {}", e);
}
if let Err(e) = publisher
.subscribe(context::current(), 1, subscriber2)
.await?
{
eprintln!("Couldn't subscribe subscriber 1: {}", e);
let clients = Arc::new(Mutex::new(HashMap::new()));
let addrs = Publisher {
clients,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
}
.start()
.await?;
println!("Broadcasting...");
publisher
.broadcast(context::current(), "hello to all".to_string())
.await?;
publisher.unsubscribe(context::current(), 1).await?;
publisher
.broadcast(context::current(), "hi again".to_string())
.await?;
drop(publisher);
let _subscriber0 = Subscriber::connect(
addrs.subscriptions,
vec!["calculus".into(), "cool shorts".into()],
)
.await?;
tokio::time::delay_for(Duration::from_millis(100)).await;
println!("Done.");
let _subscriber1 = Subscriber::connect(
addrs.subscriptions,
vec!["cool shorts".into(), "history".into()],
)
.await?;
let mut publisher = publisher::PublisherClient::new(
client::Config::default(),
tcp::connect(addrs.publisher, Json::default()).await?,
)
.spawn()?;
publisher
.publish(context::current(), "calculus".into(), "sqrt(2)".into())
.await?;
publisher
.publish(
context::current(),
"cool shorts".into(),
"hello to all".into(),
)
.await?;
publisher
.publish(context::current(), "history".into(), "napoleon".to_string())
.await?;
drop(_subscriber0);
publisher
.publish(
context::current(),
"cool shorts".into(),
"hello to who?".into(),
)
.await?;
info!("done.");
Ok(())
}

View File

@@ -5,11 +5,8 @@
// https://opensource.org/licenses/MIT.
use crate::{add::Add as AddService, double::Double as DoubleService};
use futures::{
future::{self, Ready},
prelude::*,
};
use std::{io, pin::Pin};
use futures::{future, prelude::*};
use std::io;
use tarpc::{
client, context,
server::{Handler, Server},
@@ -35,11 +32,10 @@ pub mod double {
#[derive(Clone)]
struct AddServer;
#[tarpc::server]
impl AddService for AddServer {
type AddFut = Ready<i32>;
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
future::ready(x + y)
async fn add(self, _: context::Context, x: i32, y: i32) -> i32 {
x + y
}
}
@@ -48,18 +44,13 @@ struct DoubleServer {
add_client: add::AddClient,
}
#[tarpc::server]
impl DoubleService for DoubleServer {
type DoubleFut = Pin<Box<dyn Future<Output = Result<i32, String>> + Send>>;
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
async fn double(mut client: add::AddClient, x: i32) -> Result<i32, String> {
client
.add(context::current(), x, x)
.await
.map_err(|e| e.to_string())
}
double(self.add_client.clone(), x).boxed()
async fn double(mut self, _: context::Context, x: i32) -> Result<i32, String> {
self.add_client
.add(context::current(), x, x)
.await
.map_err(|e| e.to_string())
}
}

View File

@@ -201,11 +201,13 @@
//! items expanded by a `service!` invocation.
#![deny(missing_docs)]
#![allow(clippy::type_complexity)]
#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod rpc;
pub use rpc::*;
#[cfg(feature = "serde-transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
pub mod serde_transport;
pub mod trace;

View File

@@ -8,7 +8,7 @@ use crate::{
context,
trace::SpanId,
util::{Compact, TimeUntil},
ClientMessage, PollIo, Request, Response, Transport,
ClientMessage, PollContext, PollIo, Request, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{
@@ -440,11 +440,22 @@ impl<Req, Resp, C> Future for RequestDispatch<Req, Resp, C>
where
C: Transport<ClientMessage<Req>, Response<Resp>>,
{
type Output = io::Result<()>;
type Output = anyhow::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<anyhow::Result<()>> {
loop {
match (self.as_mut().pump_read(cx)?, self.as_mut().pump_write(cx)?) {
match (
self.as_mut()
.pump_read(cx)
.context("failed to read from transport")?,
self.as_mut()
.pump_write(cx)
.context("failed to write to transport")?,
) {
(Poll::Ready(None), _) => {
info!("Shutdown: read half closed, so shutting down.");
return Poll::Ready(Ok(()));
}
(read, Poll::Ready(None)) => {
if self.as_mut().project().in_flight_requests.is_empty() {
info!("Shutdown: write half closed, and no requests in flight.");

View File

@@ -135,12 +135,14 @@ pub struct NewClient<C, D> {
pub dispatch: D,
}
impl<C, D> NewClient<C, D>
impl<C, D, E> NewClient<C, D>
where
D: Future<Output = io::Result<()>> + Send + 'static,
D: Future<Output = Result<(), E>> + Send + 'static,
E: std::fmt::Display,
{
/// Helper method to spawn the dispatch on the default executor.
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub fn spawn(self) -> io::Result<C> {
use log::error;

View File

@@ -8,6 +8,7 @@
//! client to server and is used by the server to enforce response deadlines.
use crate::trace::{self, TraceId};
use static_assertions::assert_impl_all;
use std::time::{Duration, SystemTime};
/// A request context that carries request-scoped information like deadlines and trace information.
@@ -38,6 +39,8 @@ pub struct Context {
pub trace_context: trace::Context,
}
assert_impl_all!(Context: Send, Sync);
#[cfg(feature = "serde1")]
fn ten_seconds_from_now() -> SystemTime {
SystemTime::now() + Duration::from_secs(10)

View File

@@ -32,8 +32,9 @@ pub(crate) mod util;
pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport};
use anyhow::Context as _;
use futures::task::*;
use std::{io, time::SystemTime};
use std::{fmt::Display, io, time::SystemTime};
/// A message from a client to a server.
#[derive(Debug)]
@@ -118,3 +119,30 @@ impl<T> Request<T> {
}
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
pub(crate) trait PollContext<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static;
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C;
}
impl<T> PollContext<T> for PollIo<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
{
self.map(|o| o.map(|r| r.context(context)))
}
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C,
{
self.map(|o| o.map(|r| r.with_context(f)))
}
}

View File

@@ -144,8 +144,9 @@ where
ThrottlerStream::new(self, n)
}
/// Responds to all requests with `server`.
/// Responds to all requests with [`server::serve`](Serve).
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
fn respond_with<S>(self, server: S) -> Running<Self, S>
where
S: Serve<C::Req, Resp = C::Resp>,
@@ -197,11 +198,16 @@ where
Self::new(Config::default(), transport)
}
/// Returns the inner transport.
/// Returns the inner transport over which messages are sent and received.
pub fn get_ref(&self) -> &T {
self.transport.get_ref()
}
/// Returns the inner transport over which messages are sent and received.
pub fn get_pin_ref(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().transport.get_pin_mut()
}
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
// It's possible the request was already completed, so it's fine
// if this is None.
@@ -400,6 +406,11 @@ where
C: Channel,
S: Serve<C::Req, Resp = C::Resp>,
{
/// Returns the inner channel over which messages are sent and received.
pub fn get_pin_channel(self: Pin<&mut Self>) -> Pin<&mut C> {
self.project().channel
}
fn pump_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -644,25 +655,26 @@ where
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
S::Fut: Send + 'static,
{
/// Runs the client handler until completion by spawning each
/// Runs the client handler until completion by [spawning](tokio::spawn) each
/// request handler onto the default executor.
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub fn execute(self) -> impl Future<Output = ()> {
use log::info;
self.try_for_each(|request_handler| async {
tokio::spawn(request_handler);
Ok(())
})
.unwrap_or_else(|e| info!("ClientHandler errored out: {}", e))
.map_ok(|()| log::info!("ClientHandler finished."))
.unwrap_or_else(|e| log::info!("ClientHandler errored out: {}", e))
}
}
/// A future that drives the server by spawning channels and request handlers on the default
/// A future that drives the server by [spawning](tokio::spawn) channels and request handlers on the default
/// executor.
#[pin_project]
#[derive(Debug)]
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub struct Running<St, Se> {
#[pin]
incoming: St,
@@ -682,8 +694,6 @@ where
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
use log::info;
while let Some(channel) = ready!(self.as_mut().project().incoming.poll_next(cx)) {
tokio::spawn(
channel
@@ -691,7 +701,7 @@ where
.execute(),
);
}
info!("Server shutting down.");
log::info!("Server shutting down.");
Poll::Ready(())
}
}

View File

@@ -4,9 +4,9 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
//! Provides a [`Transport`] trait as well as implementations.
//! Provides a [`Transport`](sealed::Transport) trait as well as implementations.
//!
//! The rpc crate is transport- and protocol-agnostic. Any transport that impls [`Transport`]
//! The rpc crate is transport- and protocol-agnostic. Any transport that impls [`Transport`](sealed::Transport)
//! can be plugged in, using whatever protocol it wants.
use futures::prelude::*;

View File

@@ -11,6 +11,7 @@ use std::{
};
#[cfg(feature = "serde")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde")))]
pub mod serde;
/// Extension trait for [SystemTimes](SystemTime) in the future, i.e. deadlines.

View File

@@ -15,9 +15,10 @@ pub fn serialize_epoch_secs<S>(system_time: &SystemTime, serializer: S) -> Resul
where
S: Serializer,
{
const ZERO_SECS: Duration = Duration::from_secs(0);
system_time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::from_secs(0))
.unwrap_or(ZERO_SECS)
.as_secs() // Only care about second precision
.serialize(serializer)
}

View File

@@ -16,13 +16,20 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_serde::{Framed as SerdeFramed, *};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
/// A transport that serializes to, and deserializes from, a byte stream.
#[pin_project]
pub struct Transport<S, Item, SinkItem, Codec> {
#[pin]
inner: SerdeFramed<Framed<S, LengthDelimitedCodec>, Item, SinkItem, Codec>,
}
impl<S, Item, SinkItem, Codec> Transport<S, Item, SinkItem, Codec> {
/// Returns the inner transport over which messages are sent and received.
pub fn get_ref(&self) -> &S {
self.inner.get_ref().get_ref()
}
}
impl<S, Item, SinkItem, Codec, CodecError> Stream for Transport<S, Item, SinkItem, Codec>
where
S: AsyncWrite + AsyncRead,
@@ -175,7 +182,7 @@ pub mod tcp {
})
}
/// A [`TcpListener`] that wraps connections in JSON transports.
/// A [`TcpListener`] that wraps connections in [transports](Transport).
#[pin_project]
#[derive(Debug)]
pub struct Incoming<Item, SinkItem, Codec, CodecFn> {

View File

@@ -0,0 +1,5 @@
#[test]
fn ui() {
let t = trybuild::TestCases::new();
t.compile_fail("tests/compile_fail/*.rs");
}

View File

@@ -0,0 +1,15 @@
#[tarpc::service]
trait World {
async fn hello(name: String) -> String;
}
struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
fn hello(name: String) -> String {
format!("Hello, {}!", name)
}
}
fn main() {}

View File

@@ -0,0 +1,19 @@
error: not all trait items implemented, missing: `HelloFut`
--> $DIR/tarpc_server_missing_async.rs:9:1
|
9 | impl World for HelloServer {
| ^^^^
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
--> $DIR/tarpc_server_missing_async.rs:10:5
|
10 | fn hello(name: String) -> String {
| ^^
error[E0433]: failed to resolve: use of undeclared type or module `serde`
--> $DIR/tarpc_server_missing_async.rs:1:1
|
1 | #[tarpc::service]
| ^^^^^^^^^^^^^^^^^ use of undeclared type or module `serde`
|
= note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

View File

@@ -0,0 +1,6 @@
#[tarpc::service]
trait World {
async fn pat((a, b): (u8, u32));
}
fn main() {}

View File

@@ -0,0 +1,5 @@
error: patterns aren't allowed in RPC args
--> $DIR/tarpc_service_arg_pat.rs:3:18
|
3 | async fn pat((a, b): (u8, u32));
| ^^^^^^

View File

@@ -0,0 +1,6 @@
#[tarpc::service]
trait World {
async fn new();
}
fn main() {}

View File

@@ -0,0 +1,5 @@
error: method name conflicts with generated fn `WorldClient::new`
--> $DIR/tarpc_service_fn_new.rs:3:14
|
3 | async fn new();
| ^^^

View File

@@ -0,0 +1,6 @@
#[tarpc::service]
trait World {
async fn serve();
}
fn main() {}

View File

@@ -0,0 +1,5 @@
error: method name conflicts with generated fn `World::serve`
--> $DIR/tarpc_service_fn_serve.rs:3:14
|
3 | async fn serve();
| ^^^^^

View File

@@ -1,6 +1,6 @@
use assert_matches::assert_matches;
use futures::{
future::{ready, Ready},
future::{join_all, ready, Ready},
prelude::*,
};
use std::io;
@@ -10,6 +10,7 @@ use tarpc::{
server::{self, BaseChannel, Channel, Handler},
transport::channel,
};
use tokio::join;
use tokio_serde::formats::Json;
#[tarpc_plugins::service]
@@ -110,3 +111,59 @@ async fn concurrent() -> io::Result<()> {
Ok(())
}
#[tokio::test(threaded_scheduler)]
async fn concurrent_join() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
tokio::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(Server.serve()),
);
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
let mut c = client.clone();
let req1 = c.add(context::current(), 1, 2);
let mut c = client.clone();
let req2 = c.add(context::current(), 3, 4);
let mut c = client.clone();
let req3 = c.hey(context::current(), "Tim".to_string());
let (resp1, resp2, resp3) = join!(req1, req2, req3);
assert_matches!(resp1, Ok(3));
assert_matches!(resp2, Ok(7));
assert_matches!(resp3, Ok(ref s) if s == "Hey, Tim.");
Ok(())
}
#[tokio::test(threaded_scheduler)]
async fn concurrent_join_all() -> io::Result<()> {
let _ = env_logger::try_init();
let (tx, rx) = channel::unbounded();
tokio::spawn(
tarpc::Server::default()
.incoming(stream::once(ready(rx)))
.respond_with(Server.serve()),
);
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
let mut c1 = client.clone();
let mut c2 = client.clone();
let req1 = c1.add(context::current(), 1, 2);
let req2 = c2.add(context::current(), 3, 4);
let responses = join_all(vec![req1, req2]).await;
assert_matches!(responses[0], Ok(3));
assert_matches!(responses[1], Ok(7));
Ok(())
}