mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3f34917c5 | ||
|
|
f65dd05949 | ||
|
|
240c436b34 | ||
|
|
c9803688cc | ||
|
|
4987094483 | ||
|
|
ff55080193 | ||
|
|
258193c932 | ||
|
|
67823ef5de | ||
|
|
a671457243 | ||
|
|
cf654549da | ||
|
|
6a01e32a2d | ||
|
|
e6597fab03 | ||
|
|
ebd245a93d | ||
|
|
3ebc3b5845 | ||
|
|
0e5973109d | ||
|
|
5f02d7383a | ||
|
|
2bae148529 | ||
|
|
42a2e03aab | ||
|
|
b566d0c646 | ||
|
|
b359f16767 | ||
|
|
f8681ab134 |
10
README.md
10
README.md
@@ -109,14 +109,10 @@ implement it for our Server struct.
|
||||
#[derive(Clone)]
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
// an associated type representing the future output by the fn.
|
||||
|
||||
type HelloFut = Ready<String>;
|
||||
|
||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
future::ready(format!("Hello, {}!", name))
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hello, {}!", name)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
49
RELEASES.md
49
RELEASES.md
@@ -1,3 +1,52 @@
|
||||
## 0.21.1 (2020-08-02)
|
||||
|
||||
### New Features
|
||||
|
||||
#### #[tarpc::server] diagnostics
|
||||
|
||||
When a service impl uses #[tarpc::server], only `async fn`s are re-written. This can lead to
|
||||
confusing compiler errors about missing associated types:
|
||||
|
||||
```
|
||||
error: not all trait items implemented, missing: `HelloFut`
|
||||
--> $DIR/tarpc_server_missing_async.rs:9:1
|
||||
|
|
||||
9 | impl World for HelloServer {
|
||||
| ^^^^
|
||||
```
|
||||
|
||||
The proc macro now provides better diagnostics for this case:
|
||||
|
||||
```
|
||||
error: not all trait items implemented, missing: `HelloFut`
|
||||
--> $DIR/tarpc_server_missing_async.rs:9:1
|
||||
|
|
||||
9 | impl World for HelloServer {
|
||||
| ^^^^
|
||||
|
||||
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
|
||||
--> $DIR/tarpc_server_missing_async.rs:10:5
|
||||
|
|
||||
10 | fn hello(name: String) -> String {
|
||||
| ^^
|
||||
```
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
#### Fixed client hanging when server shuts down
|
||||
|
||||
Previously, clients would ignore when the read half of the transport was closed, continuing to
|
||||
write requests. This didn't make much sense, because without the ability to receive responses,
|
||||
clients have no way to know if requests were actually processed by the server. It basically just
|
||||
led to clients that would hang for a few seconds before shutting down. This has now been
|
||||
corrected: clients will immediately shut down when the read-half of the transport is closed.
|
||||
|
||||
#### More docs.rs documentation
|
||||
|
||||
Previously, docs.rs only documented items enabled by default, notably leaving out documentation
|
||||
for tokio and serde features. This has now been corrected: docs.rs should have documentation
|
||||
for all optional features.
|
||||
|
||||
## 0.21.0 (2020-06-26)
|
||||
|
||||
### New Features
|
||||
|
||||
@@ -11,6 +11,8 @@ use tokio_serde::formats::Json;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let flags = App::new("Hello Client")
|
||||
.version("0.1")
|
||||
.author("Tim <tikue@google.com>")
|
||||
|
||||
@@ -5,10 +5,7 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::{
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use futures::{future, prelude::*};
|
||||
use service::World;
|
||||
use std::{
|
||||
io,
|
||||
@@ -25,17 +22,10 @@ use tokio_serde::formats::Json;
|
||||
#[derive(Clone)]
|
||||
struct HelloServer(SocketAddr);
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
// Each defined rpc generates two items in the trait, a fn that serves the RPC, and
|
||||
// an associated type representing the future output by the fn.
|
||||
|
||||
type HelloFut = Ready<String>;
|
||||
|
||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||
future::ready(format!(
|
||||
"Hello, {}! You are connected from {:?}.",
|
||||
name, self.0
|
||||
))
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hello, {}! You are connected from {:?}.", name, self.0)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ extern crate quote;
|
||||
extern crate syn;
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use proc_macro2::TokenStream as TokenStream2;
|
||||
use proc_macro2::{Span, TokenStream as TokenStream2};
|
||||
use quote::{format_ident, quote, ToTokens};
|
||||
use syn::{
|
||||
braced,
|
||||
@@ -20,12 +20,24 @@ use syn::{
|
||||
parenthesized,
|
||||
parse::{Parse, ParseStream},
|
||||
parse_macro_input, parse_quote, parse_str,
|
||||
punctuated::Punctuated,
|
||||
spanned::Spanned,
|
||||
token::Comma,
|
||||
Attribute, FnArg, Ident, ImplItem, ImplItemMethod, ImplItemType, ItemImpl, Lit, LitBool,
|
||||
MetaNameValue, Pat, PatType, ReturnType, Token, Type, Visibility,
|
||||
};
|
||||
|
||||
/// Accumulates multiple errors into a result.
|
||||
/// Only use this for recoverable errors, i.e. non-parse errors. Fatal errors should early exit to
|
||||
/// avoid further complications.
|
||||
macro_rules! extend_errors {
|
||||
($errors: ident, $e: expr) => {
|
||||
match $errors {
|
||||
Ok(_) => $errors = Err($e),
|
||||
Err(ref mut errors) => errors.extend($e),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
struct Service {
|
||||
attrs: Vec<Attribute>,
|
||||
vis: Visibility,
|
||||
@@ -52,20 +64,31 @@ impl Parse for Service {
|
||||
while !content.is_empty() {
|
||||
rpcs.push(content.parse()?);
|
||||
}
|
||||
let mut ident_errors = Ok(());
|
||||
for rpc in &rpcs {
|
||||
if rpc.ident == "new" {
|
||||
return Err(input.error(format!(
|
||||
"method name conflicts with generated fn `{}Client::new`",
|
||||
ident.unraw()
|
||||
)));
|
||||
extend_errors!(
|
||||
ident_errors,
|
||||
syn::Error::new(
|
||||
rpc.ident.span(),
|
||||
format!(
|
||||
"method name conflicts with generated fn `{}Client::new`",
|
||||
ident.unraw()
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
if rpc.ident == "serve" {
|
||||
return Err(input.error(format!(
|
||||
"method name conflicts with generated fn `{}::serve`",
|
||||
ident
|
||||
)));
|
||||
extend_errors!(
|
||||
ident_errors,
|
||||
syn::Error::new(
|
||||
rpc.ident.span(),
|
||||
format!("method name conflicts with generated fn `{}::serve`", ident)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
ident_errors?;
|
||||
|
||||
Ok(Self {
|
||||
attrs,
|
||||
@@ -84,17 +107,28 @@ impl Parse for RpcMethod {
|
||||
let ident = input.parse()?;
|
||||
let content;
|
||||
parenthesized!(content in input);
|
||||
let args: Punctuated<FnArg, Comma> = content.parse_terminated(FnArg::parse)?;
|
||||
let args = args
|
||||
.into_iter()
|
||||
.map(|arg| match arg {
|
||||
FnArg::Typed(captured) => match *captured.pat {
|
||||
Pat::Ident(_) => Ok(captured),
|
||||
_ => Err(input.error("patterns aren't allowed in RPC args")),
|
||||
},
|
||||
FnArg::Receiver(_) => Err(input.error("method args cannot start with self")),
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
let mut args = Vec::new();
|
||||
let mut errors = Ok(());
|
||||
for arg in content.parse_terminated::<FnArg, Comma>(FnArg::parse)? {
|
||||
match arg {
|
||||
FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
|
||||
args.push(captured);
|
||||
}
|
||||
FnArg::Typed(captured) => {
|
||||
extend_errors!(
|
||||
errors,
|
||||
syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args")
|
||||
);
|
||||
}
|
||||
FnArg::Receiver(_) => {
|
||||
extend_errors!(
|
||||
errors,
|
||||
syn::Error::new(arg.span(), "method args cannot start with self")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
errors?;
|
||||
let output = input.parse()?;
|
||||
input.parse::<Token![;]>()?;
|
||||
|
||||
@@ -113,32 +147,71 @@ struct DeriveSerde(bool);
|
||||
|
||||
impl Parse for DeriveSerde {
|
||||
fn parse(input: ParseStream) -> syn::Result<Self> {
|
||||
if input.is_empty() {
|
||||
return Ok(Self(cfg!(feature = "serde1")));
|
||||
}
|
||||
match input.parse::<MetaNameValue>()? {
|
||||
MetaNameValue {
|
||||
ref path, ref lit, ..
|
||||
} if path.segments.len() == 1
|
||||
&& path.segments.first().unwrap().ident == "derive_serde" =>
|
||||
{
|
||||
match lit {
|
||||
Lit::Bool(LitBool { value: true, .. }) if cfg!(feature = "serde1") => {
|
||||
Ok(Self(true))
|
||||
}
|
||||
Lit::Bool(LitBool { value: true, .. }) => {
|
||||
Err(input
|
||||
.error("To enable serde, first enable the `serde1` feature of tarpc"))
|
||||
}
|
||||
Lit::Bool(LitBool { value: false, .. }) => Ok(Self(false)),
|
||||
_ => Err(input.error("`derive_serde` expects a value of type `bool`")),
|
||||
let mut result = Ok(None);
|
||||
let mut derive_serde = Vec::new();
|
||||
let meta_items = input.parse_terminated::<MetaNameValue, Comma>(MetaNameValue::parse)?;
|
||||
for meta in meta_items {
|
||||
if meta.path.segments.len() != 1 {
|
||||
extend_errors!(
|
||||
result,
|
||||
syn::Error::new(
|
||||
meta.span(),
|
||||
"tarpc::service does not support this meta item"
|
||||
)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let segment = meta.path.segments.first().unwrap();
|
||||
if segment.ident != "derive_serde" {
|
||||
extend_errors!(
|
||||
result,
|
||||
syn::Error::new(
|
||||
meta.span(),
|
||||
"tarpc::service does not support this meta item"
|
||||
)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
match meta.lit {
|
||||
Lit::Bool(LitBool { value: true, .. }) if cfg!(feature = "serde1") => {
|
||||
result = result.and(Ok(Some(true)))
|
||||
}
|
||||
Lit::Bool(LitBool { value: true, .. }) => {
|
||||
extend_errors!(
|
||||
result,
|
||||
syn::Error::new(
|
||||
meta.span(),
|
||||
"To enable serde, first enable the `serde1` feature of tarpc"
|
||||
)
|
||||
);
|
||||
}
|
||||
Lit::Bool(LitBool { value: false, .. }) => result = result.and(Ok(Some(false))),
|
||||
_ => extend_errors!(
|
||||
result,
|
||||
syn::Error::new(
|
||||
meta.lit.span(),
|
||||
"`derive_serde` expects a value of type `bool`"
|
||||
)
|
||||
),
|
||||
}
|
||||
_ => {
|
||||
Err(input
|
||||
.error("tarpc::service only supports one meta item, `derive_serde = {bool}`"))
|
||||
derive_serde.push(meta);
|
||||
}
|
||||
if derive_serde.len() > 1 {
|
||||
for (i, derive_serde) in derive_serde.iter().enumerate() {
|
||||
extend_errors!(
|
||||
result,
|
||||
syn::Error::new(
|
||||
derive_serde.span(),
|
||||
format!(
|
||||
"`derive_serde` appears more than once (occurrence #{})",
|
||||
i + 1
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
let derive_serde = result?.unwrap_or(cfg!(feature = "serde1"));
|
||||
Ok(Self(derive_serde))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,6 +285,12 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
.into()
|
||||
}
|
||||
|
||||
/// generate an identifier consisting of the method name to CamelCase with
|
||||
/// Fut appended to it.
|
||||
fn associated_type_for_rpc(method: &ImplItemMethod) -> String {
|
||||
snake_to_camel(&method.sig.ident.unraw().to_string()) + "Fut"
|
||||
}
|
||||
|
||||
/// Transforms an async function into a sync one, returning a type declaration
|
||||
/// for the return type (a future).
|
||||
fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
|
||||
@@ -223,9 +302,7 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
|
||||
ReturnType::Type(_, ret) => quote!(#ret),
|
||||
};
|
||||
|
||||
// generate an identifier consisting of the method name to CamelCase with
|
||||
// Fut appended to it.
|
||||
let fut_name = snake_to_camel(&method.sig.ident.unraw().to_string()) + "Fut";
|
||||
let fut_name = associated_type_for_rpc(method);
|
||||
let fut_name_ident = Ident::new(&fut_name, method.sig.ident.span());
|
||||
|
||||
// generate the updated return signature.
|
||||
@@ -308,22 +385,37 @@ fn transform_method(method: &mut ImplItemMethod) -> ImplItemType {
|
||||
#[proc_macro_attribute]
|
||||
pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
let mut item = syn::parse_macro_input!(input as ItemImpl);
|
||||
let span = item.span();
|
||||
|
||||
// the generated type declarations
|
||||
let mut types: Vec<ImplItemType> = Vec::new();
|
||||
let mut expected_non_async_types: Vec<(&ImplItemMethod, String)> = Vec::new();
|
||||
let mut found_non_async_types: Vec<&ImplItemType> = Vec::new();
|
||||
|
||||
for inner in &mut item.items {
|
||||
if let ImplItem::Method(method) = inner {
|
||||
let sig = &method.sig;
|
||||
|
||||
// if this function is declared async, transform it into a regular function
|
||||
if sig.asyncness.is_some() {
|
||||
let typedecl = transform_method(method);
|
||||
types.push(typedecl);
|
||||
match inner {
|
||||
ImplItem::Method(method) => {
|
||||
if method.sig.asyncness.is_some() {
|
||||
// if this function is declared async, transform it into a regular function
|
||||
let typedecl = transform_method(method);
|
||||
types.push(typedecl);
|
||||
} else {
|
||||
// If it's not async, keep track of all required associated types for better
|
||||
// error reporting.
|
||||
expected_non_async_types.push((method, associated_type_for_rpc(method)));
|
||||
}
|
||||
}
|
||||
ImplItem::Type(typedecl) => found_non_async_types.push(typedecl),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
verify_types_were_provided(span, &expected_non_async_types, &found_non_async_types)
|
||||
{
|
||||
return TokenStream::from(e.to_compile_error());
|
||||
}
|
||||
|
||||
// add the type declarations into the impl block
|
||||
for t in types.into_iter() {
|
||||
item.items.push(syn::ImplItem::Type(t));
|
||||
@@ -332,6 +424,39 @@ pub fn server(_attr: TokenStream, input: TokenStream) -> TokenStream {
|
||||
TokenStream::from(quote!(#item))
|
||||
}
|
||||
|
||||
fn verify_types_were_provided(
|
||||
span: Span,
|
||||
expected: &[(&ImplItemMethod, String)],
|
||||
provided: &[&ImplItemType],
|
||||
) -> syn::Result<()> {
|
||||
let mut result = Ok(());
|
||||
for (method, expected) in expected {
|
||||
if provided
|
||||
.iter()
|
||||
.find(|typedecl| typedecl.ident == expected)
|
||||
.is_none()
|
||||
{
|
||||
let mut e = syn::Error::new(
|
||||
span,
|
||||
format!("not all trait items implemented, missing: `{}`", expected),
|
||||
);
|
||||
let fn_span = method.sig.fn_token.span();
|
||||
e.extend(syn::Error::new(
|
||||
fn_span.join(method.sig.ident.span()).unwrap_or(fn_span),
|
||||
format!(
|
||||
"hint: `#[tarpc::server]` only rewrites async fns, and `fn {}` is not async",
|
||||
method.sig.ident
|
||||
),
|
||||
));
|
||||
match result {
|
||||
Ok(_) => result = Err(e),
|
||||
Err(ref mut error) => error.extend(Some(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
// Things needed to generate the service items: trait, serve impl, request/response enums, and
|
||||
// the client stub.
|
||||
struct ServiceGenerator<'a> {
|
||||
@@ -398,7 +523,7 @@ impl<'a> ServiceGenerator<'a> {
|
||||
#vis trait #service_ident: Clone {
|
||||
#( #types_and_fns )*
|
||||
|
||||
/// Returns a serving function to use with tarpc::server::Server.
|
||||
/// Returns a serving function to use with [tarpc::server::Channel::respond_with].
|
||||
fn serve(self) -> #server_ident<Self> {
|
||||
#server_ident { service: self }
|
||||
}
|
||||
@@ -412,6 +537,7 @@ impl<'a> ServiceGenerator<'a> {
|
||||
} = self;
|
||||
|
||||
quote! {
|
||||
/// A serving function to use with [tarpc::server::Channel::respond_with].
|
||||
#[derive(Clone)]
|
||||
#vis struct #server_ident<S> {
|
||||
service: S,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.21.0"
|
||||
version = "0.21.1"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
@@ -26,6 +26,7 @@ full = ["serde1", "tokio1", "serde-transport", "tcp"]
|
||||
travis-ci = { repository = "google/tarpc" }
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
fnv = "1.0"
|
||||
futures = "0.3"
|
||||
humantime = "1.0"
|
||||
@@ -34,20 +35,29 @@ pin-project = "0.4.17"
|
||||
rand = "0.7"
|
||||
tokio = { version = "0.2", features = ["time"] }
|
||||
serde = { optional = true, version = "1.0", features = ["derive"] }
|
||||
static_assertions = "1.1.0"
|
||||
tokio-util = { optional = true, version = "0.2" }
|
||||
tarpc-plugins = { path = "../plugins", version = "0.8" }
|
||||
tokio-serde = { optional = true, version = "0.6" }
|
||||
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.0"
|
||||
bincode = "1.3"
|
||||
bytes = { version = "0.5", features = ["serde"] }
|
||||
env_logger = "0.6"
|
||||
flate2 = "1.0.16"
|
||||
futures = "0.3"
|
||||
humantime = "1.0"
|
||||
log = "0.4"
|
||||
pin-utils = "0.1.0-alpha"
|
||||
serde_bytes = "0.11"
|
||||
tokio = { version = "0.2", features = ["full"] }
|
||||
tokio-serde = { version = "0.6", features = ["json"] }
|
||||
tokio-serde = { version = "0.6", features = ["json", "bincode"] }
|
||||
trybuild = "1.0"
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
rustdoc-args = ["--cfg", "docsrs"]
|
||||
|
||||
[[example]]
|
||||
name = "server_calling_server"
|
||||
|
||||
130
tarpc/examples/compression.rs
Normal file
130
tarpc/examples/compression.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use flate2::{read::DeflateDecoder, write::DeflateEncoder, Compression};
|
||||
use futures::{Sink, SinkExt, Stream, StreamExt, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_bytes::ByteBuf;
|
||||
use std::{io, io::Read, io::Write};
|
||||
use tarpc::{
|
||||
client, context,
|
||||
serde_transport::tcp,
|
||||
server::{BaseChannel, Channel},
|
||||
};
|
||||
use tokio_serde::formats::Bincode;
|
||||
|
||||
/// Type of compression that should be enabled on the request. The transport is free to ignore this.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)]
|
||||
pub enum CompressionAlgorithm {
|
||||
Deflate,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub enum CompressedMessage<T> {
|
||||
Uncompressed(T),
|
||||
Compressed {
|
||||
algorithm: CompressionAlgorithm,
|
||||
payload: ByteBuf,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
enum CompressionType {
|
||||
Uncompressed,
|
||||
Compressed,
|
||||
}
|
||||
|
||||
async fn compress<T>(message: T) -> io::Result<CompressedMessage<T>>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let message = serialize(message)?;
|
||||
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
|
||||
encoder.write_all(&message).unwrap();
|
||||
let compressed = encoder.finish()?;
|
||||
Ok(CompressedMessage::Compressed {
|
||||
algorithm: CompressionAlgorithm::Deflate,
|
||||
payload: ByteBuf::from(compressed),
|
||||
})
|
||||
}
|
||||
|
||||
async fn decompress<T>(message: CompressedMessage<T>) -> io::Result<T>
|
||||
where
|
||||
for<'a> T: Deserialize<'a>,
|
||||
{
|
||||
match message {
|
||||
CompressedMessage::Compressed { algorithm, payload } => {
|
||||
if algorithm != CompressionAlgorithm::Deflate {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("Compression algorithm {:?} not supported", algorithm),
|
||||
));
|
||||
}
|
||||
let mut deflater = DeflateDecoder::new(payload.as_slice());
|
||||
let mut payload = ByteBuf::new();
|
||||
deflater.read_to_end(&mut payload)?;
|
||||
let message = deserialize(payload)?;
|
||||
Ok(message)
|
||||
}
|
||||
CompressedMessage::Uncompressed(message) => Ok(message),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize<T: Serialize>(t: T) -> io::Result<ByteBuf> {
|
||||
bincode::serialize(&t)
|
||||
.map(ByteBuf::from)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn deserialize<D>(message: ByteBuf) -> io::Result<D>
|
||||
where
|
||||
for<'a> D: Deserialize<'a>,
|
||||
{
|
||||
bincode::deserialize(message.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn add_compression<In, Out>(
|
||||
transport: impl Stream<Item = io::Result<CompressedMessage<In>>>
|
||||
+ Sink<CompressedMessage<Out>, Error = io::Error>,
|
||||
) -> impl Stream<Item = io::Result<In>> + Sink<Out, Error = io::Error>
|
||||
where
|
||||
Out: Serialize,
|
||||
for<'a> In: Deserialize<'a>,
|
||||
{
|
||||
transport.with(compress).and_then(decompress)
|
||||
}
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
async fn hello(self, _: context::Context, name: String) -> String {
|
||||
format!("Hey, {}!", name)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let mut incoming = tcp::listen("localhost:0", Bincode::default).await?;
|
||||
let addr = incoming.local_addr();
|
||||
tokio::spawn(async move {
|
||||
let transport = incoming.next().await.unwrap().unwrap();
|
||||
BaseChannel::with_defaults(add_compression(transport))
|
||||
.respond_with(HelloServer.serve())
|
||||
.execute()
|
||||
.await;
|
||||
});
|
||||
|
||||
let transport = tcp::connect(addr, Bincode::default()).await?;
|
||||
let mut client =
|
||||
WorldClient::new(client::Config::default(), add_compression(transport)).spawn()?;
|
||||
|
||||
println!(
|
||||
"{}",
|
||||
client.hello(context::current(), "friend".into()).await?
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,192 +4,341 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
/// - The PubSub server sets up TCP listeners on 2 ports, the "subscriber" port and the "publisher"
|
||||
/// port. Because both publishers and subscribers initiate their connections to the PubSub
|
||||
/// server, the server requires no prior knowledge of either publishers or subscribers.
|
||||
///
|
||||
/// - Subscribers connect to the server on the server's "subscriber" port. Once a connection is
|
||||
/// established, the server acts as the client of the Subscriber service, initially requesting
|
||||
/// the topics the subscriber is interested in, and subsequently sending topical messages to the
|
||||
/// subscriber.
|
||||
///
|
||||
/// - Publishers connect to the server on the "publisher" port and, once connected, they send
|
||||
/// topical messages via Publisher service to the server. The server then broadcasts each
|
||||
/// messages to all clients subscribed to the topic of that message.
|
||||
///
|
||||
/// Subscriber Publisher PubSub Server
|
||||
/// T1 | | |
|
||||
/// T2 |-----Connect------------------------------------------------------>|
|
||||
/// T3 | | |
|
||||
/// T2 |<-------------------------------------------------------Topics-----|
|
||||
/// T2 |-----(OK) Topics-------------------------------------------------->|
|
||||
/// T3 | | |
|
||||
/// T4 | |-----Connect-------------------->|
|
||||
/// T5 | | |
|
||||
/// T6 | |-----Publish-------------------->|
|
||||
/// T7 | | |
|
||||
/// T8 |<------------------------------------------------------Receive-----|
|
||||
/// T9 |-----(OK) Receive------------------------------------------------->|
|
||||
/// T10 | | |
|
||||
/// T11 | |<--------------(OK) Publish------|
|
||||
use anyhow::anyhow;
|
||||
use futures::{
|
||||
future::{self, Ready},
|
||||
channel::oneshot,
|
||||
future::{self, AbortHandle},
|
||||
prelude::*,
|
||||
Future,
|
||||
};
|
||||
use log::info;
|
||||
use publisher::Publisher as _;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
};
|
||||
use subscriber::Subscriber as _;
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{self, Handler},
|
||||
serde_transport::tcp,
|
||||
server::{self, Channel},
|
||||
};
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
pub mod subscriber {
|
||||
#[tarpc::service]
|
||||
pub trait Subscriber {
|
||||
async fn receive(message: String);
|
||||
async fn topics() -> Vec<String>;
|
||||
async fn receive(topic: String, message: String);
|
||||
}
|
||||
}
|
||||
|
||||
pub mod publisher {
|
||||
use std::net::SocketAddr;
|
||||
|
||||
#[tarpc::service]
|
||||
pub trait Publisher {
|
||||
async fn broadcast(message: String);
|
||||
async fn subscribe(id: u32, address: SocketAddr) -> Result<(), String>;
|
||||
async fn unsubscribe(id: u32);
|
||||
async fn publish(topic: String, message: String);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Subscriber {
|
||||
id: u32,
|
||||
local_addr: SocketAddr,
|
||||
topics: Vec<String>,
|
||||
}
|
||||
|
||||
#[tarpc::server]
|
||||
impl subscriber::Subscriber for Subscriber {
|
||||
type ReceiveFut = Ready<()>;
|
||||
async fn topics(self, _: context::Context) -> Vec<String> {
|
||||
self.topics.clone()
|
||||
}
|
||||
|
||||
fn receive(self, _: context::Context, message: String) -> Self::ReceiveFut {
|
||||
eprintln!("{} received message: {}", self.id, message);
|
||||
future::ready(())
|
||||
async fn receive(self, _: context::Context, topic: String, message: String) {
|
||||
info!(
|
||||
"[{}] received message on topic '{}': {}",
|
||||
self.local_addr, topic, message
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
struct SubscriberHandle(AbortHandle);
|
||||
|
||||
impl Drop for SubscriberHandle {
|
||||
fn drop(&mut self) {
|
||||
self.0.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl Subscriber {
|
||||
async fn listen(id: u32, config: server::Config) -> io::Result<SocketAddr> {
|
||||
let incoming = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
||||
.await?
|
||||
.filter_map(|r| future::ready(r.ok()));
|
||||
let addr = incoming.get_ref().local_addr();
|
||||
tokio::spawn(
|
||||
server::new(config)
|
||||
.incoming(incoming)
|
||||
.take(1)
|
||||
.respond_with(Subscriber { id }.serve()),
|
||||
);
|
||||
Ok(addr)
|
||||
async fn connect(
|
||||
publisher_addr: impl ToSocketAddrs,
|
||||
topics: Vec<String>,
|
||||
) -> anyhow::Result<SubscriberHandle> {
|
||||
let publisher = tcp::connect(publisher_addr, Json::default()).await?;
|
||||
let local_addr = publisher.local_addr()?;
|
||||
let mut handler = server::BaseChannel::with_defaults(publisher)
|
||||
.respond_with(Subscriber { local_addr, topics }.serve());
|
||||
// The first request is for the topics being subscriibed to.
|
||||
match handler.next().await {
|
||||
Some(init_topics) => init_topics?.await,
|
||||
None => {
|
||||
return Err(anyhow!(
|
||||
"[{}] Server never initialized the subscriber.",
|
||||
local_addr
|
||||
))
|
||||
}
|
||||
};
|
||||
let (handler, abort_handle) = future::abortable(handler.execute());
|
||||
tokio::spawn(async move {
|
||||
match handler.await {
|
||||
Ok(()) | Err(future::Aborted) => info!("[{}] subscriber shutdown.", local_addr),
|
||||
}
|
||||
});
|
||||
Ok(SubscriberHandle(abort_handle))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Subscription {
|
||||
subscriber: subscriber::SubscriberClient,
|
||||
topics: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Publisher {
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
|
||||
clients: Arc<Mutex<HashMap<SocketAddr, Subscription>>>,
|
||||
subscriptions: Arc<RwLock<HashMap<String, HashMap<SocketAddr, subscriber::SubscriberClient>>>>,
|
||||
}
|
||||
|
||||
struct PublisherAddrs {
|
||||
publisher: SocketAddr,
|
||||
subscriptions: SocketAddr,
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
fn new() -> Publisher {
|
||||
Publisher {
|
||||
clients: Arc::new(Mutex::new(HashMap::new())),
|
||||
async fn start(self) -> io::Result<PublisherAddrs> {
|
||||
let mut connecting_publishers = tcp::listen("localhost:0", Json::default).await?;
|
||||
|
||||
let publisher_addrs = PublisherAddrs {
|
||||
publisher: connecting_publishers.local_addr(),
|
||||
subscriptions: self.clone().start_subscription_manager().await?,
|
||||
};
|
||||
|
||||
info!("[{}] listening for publishers.", publisher_addrs.publisher);
|
||||
tokio::spawn(async move {
|
||||
// Because this is just an example, we know there will only be one publisher. In more
|
||||
// realistic code, this would be a loop to continually accept new publisher
|
||||
// connections.
|
||||
let publisher = connecting_publishers.next().await.unwrap().unwrap();
|
||||
info!("[{}] publisher connected.", publisher.peer_addr().unwrap());
|
||||
|
||||
server::BaseChannel::with_defaults(publisher)
|
||||
.respond_with(self.serve())
|
||||
.execute()
|
||||
.await
|
||||
});
|
||||
|
||||
Ok(publisher_addrs)
|
||||
}
|
||||
|
||||
async fn start_subscription_manager(mut self) -> io::Result<SocketAddr> {
|
||||
let mut connecting_subscribers = tcp::listen("localhost:0", Json::default)
|
||||
.await?
|
||||
.filter_map(|r| future::ready(r.ok()));
|
||||
let new_subscriber_addr = connecting_subscribers.get_ref().local_addr();
|
||||
info!("[{}] listening for subscribers.", new_subscriber_addr);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(conn) = connecting_subscribers.next().await {
|
||||
let subscriber_addr = conn.peer_addr().unwrap();
|
||||
|
||||
let tarpc::client::NewClient {
|
||||
client: subscriber,
|
||||
dispatch,
|
||||
} = subscriber::SubscriberClient::new(client::Config::default(), conn);
|
||||
let (ready_tx, ready) = oneshot::channel();
|
||||
self.clone()
|
||||
.start_subscriber_gc(subscriber_addr, dispatch, ready);
|
||||
|
||||
// Populate the topics
|
||||
self.initialize_subscription(subscriber_addr, subscriber)
|
||||
.await;
|
||||
|
||||
// Signal that initialization is done.
|
||||
ready_tx.send(()).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
Ok(new_subscriber_addr)
|
||||
}
|
||||
|
||||
async fn initialize_subscription(
|
||||
&mut self,
|
||||
subscriber_addr: SocketAddr,
|
||||
mut subscriber: subscriber::SubscriberClient,
|
||||
) {
|
||||
// Populate the topics
|
||||
if let Ok(topics) = subscriber.topics(context::current()).await {
|
||||
self.clients.lock().unwrap().insert(
|
||||
subscriber_addr,
|
||||
Subscription {
|
||||
subscriber: subscriber.clone(),
|
||||
topics: topics.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
info!("[{}] subscribed to topics: {:?}", subscriber_addr, topics);
|
||||
let mut subscriptions = self.subscriptions.write().unwrap();
|
||||
for topic in topics {
|
||||
subscriptions
|
||||
.entry(topic)
|
||||
.or_insert_with(HashMap::new)
|
||||
.insert(subscriber_addr, subscriber.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_subscriber_gc(
|
||||
self,
|
||||
subscriber_addr: SocketAddr,
|
||||
client_dispatch: impl Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
subscriber_ready: oneshot::Receiver<()>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = client_dispatch.await {
|
||||
info!(
|
||||
"[{}] subscriber connection broken: {:?}",
|
||||
subscriber_addr, e
|
||||
)
|
||||
}
|
||||
// Don't clean up the subscriber until initialization is done.
|
||||
let _ = subscriber_ready.await;
|
||||
if let Some(subscription) = self.clients.lock().unwrap().remove(&subscriber_addr) {
|
||||
info!(
|
||||
"[{} unsubscribing from topics: {:?}",
|
||||
subscriber_addr, subscription.topics
|
||||
);
|
||||
let mut subscriptions = self.subscriptions.write().unwrap();
|
||||
for topic in subscription.topics {
|
||||
let subscribers = subscriptions.get_mut(&topic).unwrap();
|
||||
subscribers.remove(&subscriber_addr);
|
||||
if subscribers.is_empty() {
|
||||
subscriptions.remove(&topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[tarpc::server]
|
||||
impl publisher::Publisher for Publisher {
|
||||
type BroadcastFut = Pin<Box<dyn Future<Output = ()> + Send>>;
|
||||
|
||||
fn broadcast(self, _: context::Context, message: String) -> Self::BroadcastFut {
|
||||
async fn broadcast(
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
|
||||
message: String,
|
||||
) {
|
||||
let mut clients = clients.lock().unwrap().clone();
|
||||
for client in clients.values_mut() {
|
||||
// Ignore failing subscribers. In a real pubsub,
|
||||
// you'd want to continually retry until subscribers
|
||||
// ack.
|
||||
let _ = client.receive(context::current(), message.clone()).await;
|
||||
async fn publish(self, _: context::Context, topic: String, message: String) {
|
||||
info!("received message to publish.");
|
||||
let mut subscribers = match self.subscriptions.read().unwrap().get(&topic) {
|
||||
None => return,
|
||||
Some(subscriptions) => subscriptions.clone(),
|
||||
};
|
||||
let mut publications = Vec::new();
|
||||
for client in subscribers.values_mut() {
|
||||
publications.push(client.receive(context::current(), topic.clone(), message.clone()));
|
||||
}
|
||||
// Ignore failing subscribers. In a real pubsub, you'd want to continually retry until
|
||||
// subscribers ack. Of course, a lot would be different in a real pubsub :)
|
||||
for response in future::join_all(publications).await {
|
||||
if let Err(e) = response {
|
||||
info!("failed to broadcast to subscriber: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
broadcast(self.clients.clone(), message).boxed()
|
||||
}
|
||||
|
||||
type SubscribeFut = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
|
||||
|
||||
fn subscribe(self, _: context::Context, id: u32, addr: SocketAddr) -> Self::SubscribeFut {
|
||||
async fn subscribe(
|
||||
clients: Arc<Mutex<HashMap<u32, subscriber::SubscriberClient>>>,
|
||||
id: u32,
|
||||
addr: SocketAddr,
|
||||
) -> io::Result<()> {
|
||||
let conn = tarpc::serde_transport::tcp::connect(addr, Json::default()).await?;
|
||||
let subscriber =
|
||||
subscriber::SubscriberClient::new(client::Config::default(), conn).spawn()?;
|
||||
eprintln!("Subscribing {}.", id);
|
||||
clients.lock().unwrap().insert(id, subscriber);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
subscribe(Arc::clone(&self.clients), id, addr)
|
||||
.map_err(|e| e.to_string())
|
||||
.boxed()
|
||||
}
|
||||
|
||||
type UnsubscribeFut = Pin<Box<dyn Future<Output = ()> + Send>>;
|
||||
|
||||
fn unsubscribe(self, _: context::Context, id: u32) -> Self::UnsubscribeFut {
|
||||
eprintln!("Unsubscribing {}", id);
|
||||
let mut clients = self.clients.lock().unwrap();
|
||||
if clients.remove(&id).is_none() {
|
||||
eprintln!(
|
||||
"Client {} not found. Existings clients: {:?}",
|
||||
id, &*clients
|
||||
);
|
||||
}
|
||||
future::ready(()).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let transport = tarpc::serde_transport::tcp::listen("localhost:0", Json::default)
|
||||
.await?
|
||||
.filter_map(|r| future::ready(r.ok()));
|
||||
let publisher_addr = transport.get_ref().local_addr();
|
||||
tokio::spawn(
|
||||
transport
|
||||
.take(1)
|
||||
.map(server::BaseChannel::with_defaults)
|
||||
.respond_with(Publisher::new().serve()),
|
||||
);
|
||||
|
||||
let subscriber1 = Subscriber::listen(0, server::Config::default()).await?;
|
||||
let subscriber2 = Subscriber::listen(1, server::Config::default()).await?;
|
||||
|
||||
let publisher_conn = tarpc::serde_transport::tcp::connect(publisher_addr, Json::default());
|
||||
let publisher_conn = publisher_conn.await?;
|
||||
let mut publisher =
|
||||
publisher::PublisherClient::new(client::Config::default(), publisher_conn).spawn()?;
|
||||
|
||||
if let Err(e) = publisher
|
||||
.subscribe(context::current(), 0, subscriber1)
|
||||
.await?
|
||||
{
|
||||
eprintln!("Couldn't subscribe subscriber 0: {}", e);
|
||||
}
|
||||
if let Err(e) = publisher
|
||||
.subscribe(context::current(), 1, subscriber2)
|
||||
.await?
|
||||
{
|
||||
eprintln!("Couldn't subscribe subscriber 1: {}", e);
|
||||
let clients = Arc::new(Mutex::new(HashMap::new()));
|
||||
let addrs = Publisher {
|
||||
clients,
|
||||
subscriptions: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
.start()
|
||||
.await?;
|
||||
|
||||
println!("Broadcasting...");
|
||||
publisher
|
||||
.broadcast(context::current(), "hello to all".to_string())
|
||||
.await?;
|
||||
publisher.unsubscribe(context::current(), 1).await?;
|
||||
publisher
|
||||
.broadcast(context::current(), "hi again".to_string())
|
||||
.await?;
|
||||
drop(publisher);
|
||||
let _subscriber0 = Subscriber::connect(
|
||||
addrs.subscriptions,
|
||||
vec!["calculus".into(), "cool shorts".into()],
|
||||
)
|
||||
.await?;
|
||||
|
||||
tokio::time::delay_for(Duration::from_millis(100)).await;
|
||||
println!("Done.");
|
||||
let _subscriber1 = Subscriber::connect(
|
||||
addrs.subscriptions,
|
||||
vec!["cool shorts".into(), "history".into()],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut publisher = publisher::PublisherClient::new(
|
||||
client::Config::default(),
|
||||
tcp::connect(addrs.publisher, Json::default()).await?,
|
||||
)
|
||||
.spawn()?;
|
||||
|
||||
publisher
|
||||
.publish(context::current(), "calculus".into(), "sqrt(2)".into())
|
||||
.await?;
|
||||
|
||||
publisher
|
||||
.publish(
|
||||
context::current(),
|
||||
"cool shorts".into(),
|
||||
"hello to all".into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
publisher
|
||||
.publish(context::current(), "history".into(), "napoleon".to_string())
|
||||
.await?;
|
||||
|
||||
drop(_subscriber0);
|
||||
|
||||
publisher
|
||||
.publish(
|
||||
context::current(),
|
||||
"cool shorts".into(),
|
||||
"hello to who?".into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("done.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,11 +5,8 @@
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
use crate::{add::Add as AddService, double::Double as DoubleService};
|
||||
use futures::{
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use std::{io, pin::Pin};
|
||||
use futures::{future, prelude::*};
|
||||
use std::io;
|
||||
use tarpc::{
|
||||
client, context,
|
||||
server::{Handler, Server},
|
||||
@@ -35,11 +32,10 @@ pub mod double {
|
||||
#[derive(Clone)]
|
||||
struct AddServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl AddService for AddServer {
|
||||
type AddFut = Ready<i32>;
|
||||
|
||||
fn add(self, _: context::Context, x: i32, y: i32) -> Self::AddFut {
|
||||
future::ready(x + y)
|
||||
async fn add(self, _: context::Context, x: i32, y: i32) -> i32 {
|
||||
x + y
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,18 +44,13 @@ struct DoubleServer {
|
||||
add_client: add::AddClient,
|
||||
}
|
||||
|
||||
#[tarpc::server]
|
||||
impl DoubleService for DoubleServer {
|
||||
type DoubleFut = Pin<Box<dyn Future<Output = Result<i32, String>> + Send>>;
|
||||
|
||||
fn double(self, _: context::Context, x: i32) -> Self::DoubleFut {
|
||||
async fn double(mut client: add::AddClient, x: i32) -> Result<i32, String> {
|
||||
client
|
||||
.add(context::current(), x, x)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
double(self.add_client.clone(), x).boxed()
|
||||
async fn double(mut self, _: context::Context, x: i32) -> Result<i32, String> {
|
||||
self.add_client
|
||||
.add(context::current(), x, x)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -201,11 +201,13 @@
|
||||
//! items expanded by a `service!` invocation.
|
||||
#![deny(missing_docs)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
pub mod rpc;
|
||||
pub use rpc::*;
|
||||
|
||||
#[cfg(feature = "serde-transport")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
|
||||
pub mod serde_transport;
|
||||
|
||||
pub mod trace;
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::{
|
||||
context,
|
||||
trace::SpanId,
|
||||
util::{Compact, TimeUntil},
|
||||
ClientMessage, PollIo, Request, Response, Transport,
|
||||
ClientMessage, PollContext, PollIo, Request, Response, Transport,
|
||||
};
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{
|
||||
@@ -440,11 +440,22 @@ impl<Req, Resp, C> Future for RequestDispatch<Req, Resp, C>
|
||||
where
|
||||
C: Transport<ClientMessage<Req>, Response<Resp>>,
|
||||
{
|
||||
type Output = io::Result<()>;
|
||||
type Output = anyhow::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<anyhow::Result<()>> {
|
||||
loop {
|
||||
match (self.as_mut().pump_read(cx)?, self.as_mut().pump_write(cx)?) {
|
||||
match (
|
||||
self.as_mut()
|
||||
.pump_read(cx)
|
||||
.context("failed to read from transport")?,
|
||||
self.as_mut()
|
||||
.pump_write(cx)
|
||||
.context("failed to write to transport")?,
|
||||
) {
|
||||
(Poll::Ready(None), _) => {
|
||||
info!("Shutdown: read half closed, so shutting down.");
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
(read, Poll::Ready(None)) => {
|
||||
if self.as_mut().project().in_flight_requests.is_empty() {
|
||||
info!("Shutdown: write half closed, and no requests in flight.");
|
||||
|
||||
@@ -135,12 +135,14 @@ pub struct NewClient<C, D> {
|
||||
pub dispatch: D,
|
||||
}
|
||||
|
||||
impl<C, D> NewClient<C, D>
|
||||
impl<C, D, E> NewClient<C, D>
|
||||
where
|
||||
D: Future<Output = io::Result<()>> + Send + 'static,
|
||||
D: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: std::fmt::Display,
|
||||
{
|
||||
/// Helper method to spawn the dispatch on the default executor.
|
||||
#[cfg(feature = "tokio1")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
pub fn spawn(self) -> io::Result<C> {
|
||||
use log::error;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
//! client to server and is used by the server to enforce response deadlines.
|
||||
|
||||
use crate::trace::{self, TraceId};
|
||||
use static_assertions::assert_impl_all;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
/// A request context that carries request-scoped information like deadlines and trace information.
|
||||
@@ -38,6 +39,8 @@ pub struct Context {
|
||||
pub trace_context: trace::Context,
|
||||
}
|
||||
|
||||
assert_impl_all!(Context: Send, Sync);
|
||||
|
||||
#[cfg(feature = "serde1")]
|
||||
fn ten_seconds_from_now() -> SystemTime {
|
||||
SystemTime::now() + Duration::from_secs(10)
|
||||
|
||||
@@ -32,8 +32,9 @@ pub(crate) mod util;
|
||||
|
||||
pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use futures::task::*;
|
||||
use std::{io, time::SystemTime};
|
||||
use std::{fmt::Display, io, time::SystemTime};
|
||||
|
||||
/// A message from a client to a server.
|
||||
#[derive(Debug)]
|
||||
@@ -118,3 +119,30 @@ impl<T> Request<T> {
|
||||
}
|
||||
|
||||
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
|
||||
pub(crate) trait PollContext<T> {
|
||||
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
|
||||
where
|
||||
C: Display + Send + Sync + 'static;
|
||||
|
||||
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
|
||||
where
|
||||
C: Display + Send + Sync + 'static,
|
||||
F: FnOnce() -> C;
|
||||
}
|
||||
|
||||
impl<T> PollContext<T> for PollIo<T> {
|
||||
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
|
||||
where
|
||||
C: Display + Send + Sync + 'static,
|
||||
{
|
||||
self.map(|o| o.map(|r| r.context(context)))
|
||||
}
|
||||
|
||||
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
|
||||
where
|
||||
C: Display + Send + Sync + 'static,
|
||||
F: FnOnce() -> C,
|
||||
{
|
||||
self.map(|o| o.map(|r| r.with_context(f)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,8 +144,9 @@ where
|
||||
ThrottlerStream::new(self, n)
|
||||
}
|
||||
|
||||
/// Responds to all requests with `server`.
|
||||
/// Responds to all requests with [`server::serve`](Serve).
|
||||
#[cfg(feature = "tokio1")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
fn respond_with<S>(self, server: S) -> Running<Self, S>
|
||||
where
|
||||
S: Serve<C::Req, Resp = C::Resp>,
|
||||
@@ -197,11 +198,16 @@ where
|
||||
Self::new(Config::default(), transport)
|
||||
}
|
||||
|
||||
/// Returns the inner transport.
|
||||
/// Returns the inner transport over which messages are sent and received.
|
||||
pub fn get_ref(&self) -> &T {
|
||||
self.transport.get_ref()
|
||||
}
|
||||
|
||||
/// Returns the inner transport over which messages are sent and received.
|
||||
pub fn get_pin_ref(self: Pin<&mut Self>) -> Pin<&mut T> {
|
||||
self.project().transport.get_pin_mut()
|
||||
}
|
||||
|
||||
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
|
||||
// It's possible the request was already completed, so it's fine
|
||||
// if this is None.
|
||||
@@ -400,6 +406,11 @@ where
|
||||
C: Channel,
|
||||
S: Serve<C::Req, Resp = C::Resp>,
|
||||
{
|
||||
/// Returns the inner channel over which messages are sent and received.
|
||||
pub fn get_pin_channel(self: Pin<&mut Self>) -> Pin<&mut C> {
|
||||
self.project().channel
|
||||
}
|
||||
|
||||
fn pump_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
@@ -644,25 +655,26 @@ where
|
||||
S: Serve<C::Req, Resp = C::Resp> + Send + 'static,
|
||||
S::Fut: Send + 'static,
|
||||
{
|
||||
/// Runs the client handler until completion by spawning each
|
||||
/// Runs the client handler until completion by [spawning](tokio::spawn) each
|
||||
/// request handler onto the default executor.
|
||||
#[cfg(feature = "tokio1")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
pub fn execute(self) -> impl Future<Output = ()> {
|
||||
use log::info;
|
||||
|
||||
self.try_for_each(|request_handler| async {
|
||||
tokio::spawn(request_handler);
|
||||
Ok(())
|
||||
})
|
||||
.unwrap_or_else(|e| info!("ClientHandler errored out: {}", e))
|
||||
.map_ok(|()| log::info!("ClientHandler finished."))
|
||||
.unwrap_or_else(|e| log::info!("ClientHandler errored out: {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
/// A future that drives the server by spawning channels and request handlers on the default
|
||||
/// A future that drives the server by [spawning](tokio::spawn) channels and request handlers on the default
|
||||
/// executor.
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
#[cfg(feature = "tokio1")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
|
||||
pub struct Running<St, Se> {
|
||||
#[pin]
|
||||
incoming: St,
|
||||
@@ -682,8 +694,6 @@ where
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
||||
use log::info;
|
||||
|
||||
while let Some(channel) = ready!(self.as_mut().project().incoming.poll_next(cx)) {
|
||||
tokio::spawn(
|
||||
channel
|
||||
@@ -691,7 +701,7 @@ where
|
||||
.execute(),
|
||||
);
|
||||
}
|
||||
info!("Server shutting down.");
|
||||
log::info!("Server shutting down.");
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://opensource.org/licenses/MIT.
|
||||
|
||||
//! Provides a [`Transport`] trait as well as implementations.
|
||||
//! Provides a [`Transport`](sealed::Transport) trait as well as implementations.
|
||||
//!
|
||||
//! The rpc crate is transport- and protocol-agnostic. Any transport that impls [`Transport`]
|
||||
//! The rpc crate is transport- and protocol-agnostic. Any transport that impls [`Transport`](sealed::Transport)
|
||||
//! can be plugged in, using whatever protocol it wants.
|
||||
|
||||
use futures::prelude::*;
|
||||
|
||||
@@ -11,6 +11,7 @@ use std::{
|
||||
};
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "serde")))]
|
||||
pub mod serde;
|
||||
|
||||
/// Extension trait for [SystemTimes](SystemTime) in the future, i.e. deadlines.
|
||||
|
||||
@@ -15,9 +15,10 @@ pub fn serialize_epoch_secs<S>(system_time: &SystemTime, serializer: S) -> Resul
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
const ZERO_SECS: Duration = Duration::from_secs(0);
|
||||
system_time
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or(Duration::from_secs(0))
|
||||
.unwrap_or(ZERO_SECS)
|
||||
.as_secs() // Only care about second precision
|
||||
.serialize(serializer)
|
||||
}
|
||||
|
||||
@@ -16,13 +16,20 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_serde::{Framed as SerdeFramed, *};
|
||||
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
|
||||
|
||||
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
|
||||
/// A transport that serializes to, and deserializes from, a byte stream.
|
||||
#[pin_project]
|
||||
pub struct Transport<S, Item, SinkItem, Codec> {
|
||||
#[pin]
|
||||
inner: SerdeFramed<Framed<S, LengthDelimitedCodec>, Item, SinkItem, Codec>,
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem, Codec> Transport<S, Item, SinkItem, Codec> {
|
||||
/// Returns the inner transport over which messages are sent and received.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
self.inner.get_ref().get_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Item, SinkItem, Codec, CodecError> Stream for Transport<S, Item, SinkItem, Codec>
|
||||
where
|
||||
S: AsyncWrite + AsyncRead,
|
||||
@@ -175,7 +182,7 @@ pub mod tcp {
|
||||
})
|
||||
}
|
||||
|
||||
/// A [`TcpListener`] that wraps connections in JSON transports.
|
||||
/// A [`TcpListener`] that wraps connections in [transports](Transport).
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct Incoming<Item, SinkItem, Codec, CodecFn> {
|
||||
|
||||
5
tarpc/tests/compile_fail.rs
Normal file
5
tarpc/tests/compile_fail.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
#[test]
|
||||
fn ui() {
|
||||
let t = trybuild::TestCases::new();
|
||||
t.compile_fail("tests/compile_fail/*.rs");
|
||||
}
|
||||
15
tarpc/tests/compile_fail/tarpc_server_missing_async.rs
Normal file
15
tarpc/tests/compile_fail/tarpc_server_missing_async.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn hello(name: String) -> String;
|
||||
}
|
||||
|
||||
struct HelloServer;
|
||||
|
||||
#[tarpc::server]
|
||||
impl World for HelloServer {
|
||||
fn hello(name: String) -> String {
|
||||
format!("Hello, {}!", name)
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {}
|
||||
19
tarpc/tests/compile_fail/tarpc_server_missing_async.stderr
Normal file
19
tarpc/tests/compile_fail/tarpc_server_missing_async.stderr
Normal file
@@ -0,0 +1,19 @@
|
||||
error: not all trait items implemented, missing: `HelloFut`
|
||||
--> $DIR/tarpc_server_missing_async.rs:9:1
|
||||
|
|
||||
9 | impl World for HelloServer {
|
||||
| ^^^^
|
||||
|
||||
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
|
||||
--> $DIR/tarpc_server_missing_async.rs:10:5
|
||||
|
|
||||
10 | fn hello(name: String) -> String {
|
||||
| ^^
|
||||
|
||||
error[E0433]: failed to resolve: use of undeclared type or module `serde`
|
||||
--> $DIR/tarpc_server_missing_async.rs:1:1
|
||||
|
|
||||
1 | #[tarpc::service]
|
||||
| ^^^^^^^^^^^^^^^^^ use of undeclared type or module `serde`
|
||||
|
|
||||
= note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)
|
||||
6
tarpc/tests/compile_fail/tarpc_service_arg_pat.rs
Normal file
6
tarpc/tests/compile_fail/tarpc_service_arg_pat.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn pat((a, b): (u8, u32));
|
||||
}
|
||||
|
||||
fn main() {}
|
||||
5
tarpc/tests/compile_fail/tarpc_service_arg_pat.stderr
Normal file
5
tarpc/tests/compile_fail/tarpc_service_arg_pat.stderr
Normal file
@@ -0,0 +1,5 @@
|
||||
error: patterns aren't allowed in RPC args
|
||||
--> $DIR/tarpc_service_arg_pat.rs:3:18
|
||||
|
|
||||
3 | async fn pat((a, b): (u8, u32));
|
||||
| ^^^^^^
|
||||
6
tarpc/tests/compile_fail/tarpc_service_fn_new.rs
Normal file
6
tarpc/tests/compile_fail/tarpc_service_fn_new.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn new();
|
||||
}
|
||||
|
||||
fn main() {}
|
||||
5
tarpc/tests/compile_fail/tarpc_service_fn_new.stderr
Normal file
5
tarpc/tests/compile_fail/tarpc_service_fn_new.stderr
Normal file
@@ -0,0 +1,5 @@
|
||||
error: method name conflicts with generated fn `WorldClient::new`
|
||||
--> $DIR/tarpc_service_fn_new.rs:3:14
|
||||
|
|
||||
3 | async fn new();
|
||||
| ^^^
|
||||
6
tarpc/tests/compile_fail/tarpc_service_fn_serve.rs
Normal file
6
tarpc/tests/compile_fail/tarpc_service_fn_serve.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
#[tarpc::service]
|
||||
trait World {
|
||||
async fn serve();
|
||||
}
|
||||
|
||||
fn main() {}
|
||||
5
tarpc/tests/compile_fail/tarpc_service_fn_serve.stderr
Normal file
5
tarpc/tests/compile_fail/tarpc_service_fn_serve.stderr
Normal file
@@ -0,0 +1,5 @@
|
||||
error: method name conflicts with generated fn `World::serve`
|
||||
--> $DIR/tarpc_service_fn_serve.rs:3:14
|
||||
|
|
||||
3 | async fn serve();
|
||||
| ^^^^^
|
||||
@@ -1,6 +1,6 @@
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{
|
||||
future::{ready, Ready},
|
||||
future::{join_all, ready, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use std::io;
|
||||
@@ -10,6 +10,7 @@ use tarpc::{
|
||||
server::{self, BaseChannel, Channel, Handler},
|
||||
transport::channel,
|
||||
};
|
||||
use tokio::join;
|
||||
use tokio_serde::formats::Json;
|
||||
|
||||
#[tarpc_plugins::service]
|
||||
@@ -110,3 +111,59 @@ async fn concurrent() -> io::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(threaded_scheduler)]
|
||||
async fn concurrent_join() -> io::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (tx, rx) = channel::unbounded();
|
||||
tokio::spawn(
|
||||
tarpc::Server::default()
|
||||
.incoming(stream::once(ready(rx)))
|
||||
.respond_with(Server.serve()),
|
||||
);
|
||||
|
||||
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
|
||||
|
||||
let mut c = client.clone();
|
||||
let req1 = c.add(context::current(), 1, 2);
|
||||
|
||||
let mut c = client.clone();
|
||||
let req2 = c.add(context::current(), 3, 4);
|
||||
|
||||
let mut c = client.clone();
|
||||
let req3 = c.hey(context::current(), "Tim".to_string());
|
||||
|
||||
let (resp1, resp2, resp3) = join!(req1, req2, req3);
|
||||
assert_matches!(resp1, Ok(3));
|
||||
assert_matches!(resp2, Ok(7));
|
||||
assert_matches!(resp3, Ok(ref s) if s == "Hey, Tim.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(threaded_scheduler)]
|
||||
async fn concurrent_join_all() -> io::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (tx, rx) = channel::unbounded();
|
||||
tokio::spawn(
|
||||
tarpc::Server::default()
|
||||
.incoming(stream::once(ready(rx)))
|
||||
.respond_with(Server.serve()),
|
||||
);
|
||||
|
||||
let client = ServiceClient::new(client::Config::default(), tx).spawn()?;
|
||||
|
||||
let mut c1 = client.clone();
|
||||
let mut c2 = client.clone();
|
||||
|
||||
let req1 = c1.add(context::current(), 1, 2);
|
||||
let req2 = c2.add(context::current(), 3, 4);
|
||||
|
||||
let responses = join_all(vec![req1, req2]).await;
|
||||
assert_matches!(responses[0], Ok(3));
|
||||
assert_matches!(responses[1], Ok(7));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user