mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
19 Commits
client-clo
...
v0.29.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bf247ae40 | ||
|
|
69442d2368 | ||
|
|
e135e39504 | ||
|
|
a3a6404a30 | ||
|
|
b36eac80b1 | ||
|
|
d7070e4bc3 | ||
|
|
b5d1828308 | ||
|
|
92cfe63c4f | ||
|
|
839a2f067c | ||
|
|
b5d593488c | ||
|
|
eea38b8bf4 | ||
|
|
70493c15f4 | ||
|
|
f7c5d6a7c3 | ||
|
|
98c5d2a18b | ||
|
|
46b534f7c6 | ||
|
|
42b4fc52b1 | ||
|
|
350dbcdad0 | ||
|
|
b1b4461d89 | ||
|
|
f694b7573a |
14
README.md
14
README.md
@@ -40,7 +40,7 @@ rather than in a separate language such as .proto. This means there's no separat
|
|||||||
process, and no context switching between different languages.
|
process, and no context switching between different languages.
|
||||||
|
|
||||||
Some other features of tarpc:
|
Some other features of tarpc:
|
||||||
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
|
- Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
|
||||||
used as a transport to connect the client and server.
|
used as a transport to connect the client and server.
|
||||||
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
||||||
- Cascading cancellation: dropping a request will send a cancellation message to the server.
|
- Cascading cancellation: dropping a request will send a cancellation message to the server.
|
||||||
@@ -55,7 +55,7 @@ Some other features of tarpc:
|
|||||||
[tracing](https://github.com/tokio-rs/tracing) primitives extended with
|
[tracing](https://github.com/tokio-rs/tracing) primitives extended with
|
||||||
[OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
|
[OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
|
||||||
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
|
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
|
||||||
each RPC can be traced through the client, server, amd other dependencies downstream of the
|
each RPC can be traced through the client, server, and other dependencies downstream of the
|
||||||
server. Even for applications not connected to a distributed tracing collector, the
|
server. Even for applications not connected to a distributed tracing collector, the
|
||||||
instrumentation can also be ingested by regular loggers like
|
instrumentation can also be ingested by regular loggers like
|
||||||
[env_logger](https://github.com/env-logger-rs/env_logger/).
|
[env_logger](https://github.com/env-logger-rs/env_logger/).
|
||||||
@@ -67,7 +67,7 @@ Some other features of tarpc:
|
|||||||
Add to your `Cargo.toml` dependencies:
|
Add to your `Cargo.toml` dependencies:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
tarpc = "0.27"
|
tarpc = "0.29"
|
||||||
```
|
```
|
||||||
|
|
||||||
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
@@ -82,7 +82,7 @@ your `Cargo.toml`:
|
|||||||
```toml
|
```toml
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tarpc = { version = "0.27", features = ["tokio1"] }
|
tarpc = { version = "0.29", features = ["tokio1"] }
|
||||||
tokio = { version = "1.0", features = ["macros"] }
|
tokio = { version = "1.0", features = ["macros"] }
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -100,7 +100,7 @@ use futures::{
|
|||||||
};
|
};
|
||||||
use tarpc::{
|
use tarpc::{
|
||||||
client, context,
|
client, context,
|
||||||
server::{self, incoming::Incoming},
|
server::{self, incoming::Incoming, Channel},
|
||||||
};
|
};
|
||||||
|
|
||||||
// This is the service definition. It looks a lot like a trait definition.
|
// This is the service definition. It looks a lot like a trait definition.
|
||||||
@@ -128,7 +128,7 @@ impl World for HelloServer {
|
|||||||
type HelloFut = Ready<String>;
|
type HelloFut = Ready<String>;
|
||||||
|
|
||||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||||
future::ready(format!("Hello, {}!", name))
|
future::ready(format!("Hello, {name}!"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -155,7 +155,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||||
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
||||||
|
|
||||||
println!("{}", hello);
|
println!("{hello}");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
24
RELEASES.md
24
RELEASES.md
@@ -1,8 +1,30 @@
|
|||||||
|
## 0.29.0 (2022-05-26)
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
`Context.deadline` is now serialized as a Duration. This prevents clock skew from affecting deadline
|
||||||
|
behavior. For more details see https://github.com/google/tarpc/pull/367 and its [related
|
||||||
|
issue](https://github.com/google/tarpc/issues/366).
|
||||||
|
|
||||||
|
## 0.28.0 (2022-04-06)
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
- The minimum supported Rust version has increased to 1.58.0.
|
||||||
|
- The version of opentelemetry depended on by tarpc has increased to 0.17.0.
|
||||||
|
|
||||||
|
## 0.27.2 (2021-10-08)
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
Clients will now close their transport before dropping it. An attempt at a clean shutdown can help
|
||||||
|
the server drop its connections more quickly.
|
||||||
|
|
||||||
## 0.27.1 (2021-09-22)
|
## 0.27.1 (2021-09-22)
|
||||||
|
|
||||||
### Breaking Changes
|
### Breaking Changes
|
||||||
|
|
||||||
### RPC error type is changing
|
#### RPC error type is changing
|
||||||
|
|
||||||
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
|
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
|
||||||
tarpc::client::RpcError>`.
|
tarpc::client::RpcError>`.
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc-example-service"
|
name = "tarpc-example-service"
|
||||||
version = "0.10.0"
|
version = "0.11.0"
|
||||||
|
rust-version = "1.56"
|
||||||
authors = ["Tim Kuehn <tikue@google.com>"]
|
authors = ["Tim Kuehn <tikue@google.com>"]
|
||||||
edition = "2018"
|
edition = "2021"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
documentation = "https://docs.rs/tarpc-example-service"
|
documentation = "https://docs.rs/tarpc-example-service"
|
||||||
homepage = "https://github.com/google/tarpc"
|
homepage = "https://github.com/google/tarpc"
|
||||||
@@ -14,13 +15,13 @@ description = "An example server built on tarpc."
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
clap = "3.0.0-beta.2"
|
clap = { version = "3.0.0-rc.9", features = ["derive"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
|
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
|
||||||
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
|
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
tarpc = { version = "0.27", path = "../tarpc", features = ["full"] }
|
tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
|
||||||
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
|
||||||
tracing = { version = "0.1" }
|
tracing = { version = "0.1" }
|
||||||
tracing-opentelemetry = "0.15"
|
tracing-opentelemetry = "0.15"
|
||||||
|
|||||||
@@ -4,14 +4,14 @@
|
|||||||
// license that can be found in the LICENSE file or at
|
// license that can be found in the LICENSE file or at
|
||||||
// https://opensource.org/licenses/MIT.
|
// https://opensource.org/licenses/MIT.
|
||||||
|
|
||||||
use clap::Clap;
|
use clap::Parser;
|
||||||
use service::{init_tracing, WorldClient};
|
use service::{init_tracing, WorldClient};
|
||||||
use std::{net::SocketAddr, time::Duration};
|
use std::{net::SocketAddr, time::Duration};
|
||||||
use tarpc::{client, context, tokio_serde::formats::Json};
|
use tarpc::{client, context, tokio_serde::formats::Json};
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tracing::Instrument;
|
use tracing::Instrument;
|
||||||
|
|
||||||
#[derive(Clap)]
|
#[derive(Parser)]
|
||||||
struct Flags {
|
struct Flags {
|
||||||
/// Sets the server address to connect to.
|
/// Sets the server address to connect to.
|
||||||
#[clap(long)]
|
#[clap(long)]
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
// license that can be found in the LICENSE file or at
|
// license that can be found in the LICENSE file or at
|
||||||
// https://opensource.org/licenses/MIT.
|
// https://opensource.org/licenses/MIT.
|
||||||
|
|
||||||
use clap::Clap;
|
use clap::Parser;
|
||||||
use futures::{future, prelude::*};
|
use futures::{future, prelude::*};
|
||||||
use rand::{
|
use rand::{
|
||||||
distributions::{Distribution, Uniform},
|
distributions::{Distribution, Uniform},
|
||||||
@@ -22,7 +22,7 @@ use tarpc::{
|
|||||||
};
|
};
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
|
|
||||||
#[derive(Clap)]
|
#[derive(Parser)]
|
||||||
struct Flags {
|
struct Flags {
|
||||||
/// Sets the port number to listen on.
|
/// Sets the port number to listen on.
|
||||||
#[clap(long)]
|
#[clap(long)]
|
||||||
@@ -40,7 +40,7 @@ impl World for HelloServer {
|
|||||||
let sleep_time =
|
let sleep_time =
|
||||||
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
|
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
|
||||||
time::sleep(sleep_time).await;
|
time::sleep(sleep_time).await;
|
||||||
format!("Hello, {}! You are connected from {}", name, self.0)
|
format!("Hello, {name}! You are connected from {}", self.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc-plugins"
|
name = "tarpc-plugins"
|
||||||
version = "0.12.0"
|
version = "0.12.0"
|
||||||
|
rust-version = "1.56"
|
||||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2021"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
documentation = "https://docs.rs/tarpc-plugins"
|
documentation = "https://docs.rs/tarpc-plugins"
|
||||||
homepage = "https://github.com/google/tarpc"
|
homepage = "https://github.com/google/tarpc"
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ impl Parse for Service {
|
|||||||
ident_errors,
|
ident_errors,
|
||||||
syn::Error::new(
|
syn::Error::new(
|
||||||
rpc.ident.span(),
|
rpc.ident.span(),
|
||||||
format!("method name conflicts with generated fn `{}::serve`", ident)
|
format!("method name conflicts with generated fn `{ident}::serve`")
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -270,7 +270,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
|||||||
let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>();
|
let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>();
|
||||||
let request_names = methods
|
let request_names = methods
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| format!("{}.{}", ident, m))
|
.map(|m| format!("{ident}.{m}"))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
ServiceGenerator {
|
ServiceGenerator {
|
||||||
@@ -306,7 +306,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
|
|||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
future_types: &camel_case_fn_names
|
future_types: &camel_case_fn_names
|
||||||
.iter()
|
.iter()
|
||||||
.map(|name| parse_str(&format!("{}Fut", name)).unwrap())
|
.map(|name| parse_str(&format!("{name}Fut")).unwrap())
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
derive_serialize: derive_serialize.as_ref(),
|
derive_serialize: derive_serialize.as_ref(),
|
||||||
}
|
}
|
||||||
@@ -409,7 +409,7 @@ fn verify_types_were_provided(
|
|||||||
if !provided.iter().any(|typedecl| typedecl.ident == expected) {
|
if !provided.iter().any(|typedecl| typedecl.ident == expected) {
|
||||||
let mut e = syn::Error::new(
|
let mut e = syn::Error::new(
|
||||||
span,
|
span,
|
||||||
format!("not all trait items implemented, missing: `{}`", expected),
|
format!("not all trait items implemented, missing: `{expected}`"),
|
||||||
);
|
);
|
||||||
let fn_span = method.sig.fn_token.span();
|
let fn_span = method.sig.fn_token.span();
|
||||||
e.extend(syn::Error::new(
|
e.extend(syn::Error::new(
|
||||||
@@ -479,7 +479,7 @@ impl<'a> ServiceGenerator<'a> {
|
|||||||
),
|
),
|
||||||
output,
|
output,
|
||||||
)| {
|
)| {
|
||||||
let ty_doc = format!("The response future returned by [`{}::{}`].", service_ident, ident);
|
let ty_doc = format!("The response future returned by [`{service_ident}::{ident}`].");
|
||||||
quote! {
|
quote! {
|
||||||
#[doc = #ty_doc]
|
#[doc = #ty_doc]
|
||||||
type #future_type: std::future::Future<Output = #output>;
|
type #future_type: std::future::Future<Output = #output>;
|
||||||
|
|||||||
@@ -1,8 +1,12 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc"
|
name = "tarpc"
|
||||||
version = "0.27.1"
|
version = "0.29.0"
|
||||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
rust-version = "1.58.0"
|
||||||
edition = "2018"
|
authors = [
|
||||||
|
"Adam Wright <adam.austin.wright@gmail.com>",
|
||||||
|
"Tim Kuehn <timothy.j.kuehn@gmail.com>",
|
||||||
|
]
|
||||||
|
edition = "2021"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
documentation = "https://docs.rs/tarpc"
|
documentation = "https://docs.rs/tarpc"
|
||||||
homepage = "https://github.com/google/tarpc"
|
homepage = "https://github.com/google/tarpc"
|
||||||
@@ -16,13 +20,20 @@ description = "An RPC framework for Rust with a focus on ease of use."
|
|||||||
default = []
|
default = []
|
||||||
|
|
||||||
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
|
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
|
||||||
tokio1 = ["tokio/rt-multi-thread"]
|
tokio1 = ["tokio/rt"]
|
||||||
serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
|
serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
|
||||||
serde-transport-json = ["tokio-serde/json"]
|
serde-transport-json = ["tokio-serde/json"]
|
||||||
serde-transport-bincode = ["tokio-serde/bincode"]
|
serde-transport-bincode = ["tokio-serde/bincode"]
|
||||||
tcp = ["tokio/net"]
|
tcp = ["tokio/net"]
|
||||||
|
|
||||||
full = ["serde1", "tokio1", "serde-transport", "serde-transport-json", "serde-transport-bincode", "tcp"]
|
full = [
|
||||||
|
"serde1",
|
||||||
|
"tokio1",
|
||||||
|
"serde-transport",
|
||||||
|
"serde-transport-json",
|
||||||
|
"serde-transport-bincode",
|
||||||
|
"tcp",
|
||||||
|
]
|
||||||
|
|
||||||
[badges]
|
[badges]
|
||||||
travis-ci = { repository = "google/tarpc" }
|
travis-ci = { repository = "google/tarpc" }
|
||||||
@@ -39,11 +50,14 @@ static_assertions = "1.1.0"
|
|||||||
tarpc-plugins = { path = "../plugins", version = "0.12" }
|
tarpc-plugins = { path = "../plugins", version = "0.12" }
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "1", features = ["time"] }
|
tokio = { version = "1", features = ["time"] }
|
||||||
tokio-util = { version = "0.6.3", features = ["time"] }
|
tokio-util = { version = "0.6.9", features = ["time"] }
|
||||||
tokio-serde = { optional = true, version = "0.8" }
|
tokio-serde = { optional = true, version = "0.8" }
|
||||||
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
|
tracing = { version = "0.1", default-features = false, features = [
|
||||||
tracing-opentelemetry = { version = "0.15", default-features = false }
|
"attributes",
|
||||||
opentelemetry = { version = "0.16", default-features = false }
|
"log",
|
||||||
|
] }
|
||||||
|
tracing-opentelemetry = { version = "0.17.2", default-features = false }
|
||||||
|
opentelemetry = { version = "0.17.0", default-features = false }
|
||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
@@ -52,11 +66,13 @@ bincode = "1.3"
|
|||||||
bytes = { version = "1", features = ["serde"] }
|
bytes = { version = "1", features = ["serde"] }
|
||||||
flate2 = "1.0"
|
flate2 = "1.0"
|
||||||
futures-test = "0.3"
|
futures-test = "0.3"
|
||||||
opentelemetry = { version = "0.16", default-features = false, features = ["rt-tokio"] }
|
opentelemetry = { version = "0.17.0", default-features = false, features = [
|
||||||
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
|
"rt-tokio",
|
||||||
|
] }
|
||||||
|
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] }
|
||||||
pin-utils = "0.1.0-alpha"
|
pin-utils = "0.1.0-alpha"
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
tracing-subscriber = "0.2"
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
tokio = { version = "1", features = ["full", "test-util"] }
|
tokio = { version = "1", features = ["full", "test-util"] }
|
||||||
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
|
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
|
||||||
trybuild = "1.0"
|
trybuild = "1.0"
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ where
|
|||||||
if algorithm != CompressionAlgorithm::Deflate {
|
if algorithm != CompressionAlgorithm::Deflate {
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidData,
|
io::ErrorKind::InvalidData,
|
||||||
format!("Compression algorithm {:?} not supported", algorithm),
|
format!("Compression algorithm {algorithm:?} not supported"),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
let mut deflater = DeflateDecoder::new(payload.as_slice());
|
let mut deflater = DeflateDecoder::new(payload.as_slice());
|
||||||
@@ -102,7 +102,7 @@ struct HelloServer;
|
|||||||
#[tarpc::server]
|
#[tarpc::server]
|
||||||
impl World for HelloServer {
|
impl World for HelloServer {
|
||||||
async fn hello(self, _: context::Context, name: String) -> String {
|
async fn hello(self, _: context::Context, name: String) -> String {
|
||||||
format!("Hey, {}!", name)
|
format!("Hey, {name}!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -290,7 +290,7 @@ fn init_tracing(service_name: &str) -> anyhow::Result<()> {
|
|||||||
.install_batch(opentelemetry::runtime::Tokio)?;
|
.install_batch(opentelemetry::runtime::Tokio)?;
|
||||||
|
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(tracing_subscriber::EnvFilter::from_default_env())
|
.with(tracing_subscriber::filter::EnvFilter::from_default_env())
|
||||||
.with(tracing_subscriber::fmt::layer())
|
.with(tracing_subscriber::fmt::layer())
|
||||||
.with(tracing_opentelemetry::layer().with_tracer(tracer))
|
.with(tracing_opentelemetry::layer().with_tracer(tracer))
|
||||||
.try_init()?;
|
.try_init()?;
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ impl World for HelloServer {
|
|||||||
type HelloFut = Ready<String>;
|
type HelloFut = Ready<String>;
|
||||||
|
|
||||||
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||||
future::ready(format!("Hello, {}!", name))
|
future::ready(format!("Hello, {name}!"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,7 +49,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// specifies a deadline and trace information which can be helpful in debugging requests.
|
// specifies a deadline and trace information which can be helpful in debugging requests.
|
||||||
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
||||||
|
|
||||||
println!("{}", hello);
|
println!("{hello}");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@
|
|||||||
|
|
||||||
use crate::{add::Add as AddService, double::Double as DoubleService};
|
use crate::{add::Add as AddService, double::Double as DoubleService};
|
||||||
use futures::{future, prelude::*};
|
use futures::{future, prelude::*};
|
||||||
use std::env;
|
|
||||||
use tarpc::{
|
use tarpc::{
|
||||||
client, context,
|
client, context,
|
||||||
server::{incoming::Incoming, BaseChannel},
|
server::{incoming::Incoming, BaseChannel},
|
||||||
@@ -56,9 +55,9 @@ impl DoubleService for DoubleServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn init_tracing(service_name: &str) -> anyhow::Result<()> {
|
fn init_tracing(service_name: &str) -> anyhow::Result<()> {
|
||||||
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
|
|
||||||
let tracer = opentelemetry_jaeger::new_pipeline()
|
let tracer = opentelemetry_jaeger::new_pipeline()
|
||||||
.with_service_name(service_name)
|
.with_service_name(service_name)
|
||||||
|
.with_auto_split_batch(true)
|
||||||
.with_max_packet_size(2usize.pow(13))
|
.with_max_packet_size(2usize.pow(13))
|
||||||
.install_batch(opentelemetry::runtime::Tokio)?;
|
.install_batch(opentelemetry::runtime::Tokio)?;
|
||||||
|
|
||||||
|
|||||||
@@ -81,14 +81,10 @@ impl<C, D> fmt::Debug for NewClient<C, D> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
const _CHECK_USIZE: () = assert!(
|
||||||
#[allow(clippy::no_effect)]
|
std::mem::size_of::<usize>() <= std::mem::size_of::<u64>(),
|
||||||
const CHECK_USIZE: () = {
|
"usize is too big to fit in u64"
|
||||||
if std::mem::size_of::<usize>() > std::mem::size_of::<u64>() {
|
);
|
||||||
// TODO: replace this with panic!() as soon as RFC 2345 gets stabilized
|
|
||||||
["usize is too big to fit in u64"][42];
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Handles communication from the client to request dispatch.
|
/// Handles communication from the client to request dispatch.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -118,6 +114,7 @@ impl<Req, Resp> Channel<Req, Resp> {
|
|||||||
skip(self, ctx, request_name, request),
|
skip(self, ctx, request_name, request),
|
||||||
fields(
|
fields(
|
||||||
rpc.trace_id = tracing::field::Empty,
|
rpc.trace_id = tracing::field::Empty,
|
||||||
|
rpc.deadline = %humantime::format_rfc3339(ctx.deadline),
|
||||||
otel.kind = "client",
|
otel.kind = "client",
|
||||||
otel.name = request_name)
|
otel.name = request_name)
|
||||||
)]
|
)]
|
||||||
@@ -172,7 +169,8 @@ struct ResponseGuard<'a, Resp> {
|
|||||||
|
|
||||||
/// An error that can occur in the processing of an RPC. This is not request-specific errors but
|
/// An error that can occur in the processing of an RPC. This is not request-specific errors but
|
||||||
/// rather cross-cutting errors that can always occur.
|
/// rather cross-cutting errors that can always occur.
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
|
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
|
||||||
pub enum RpcError {
|
pub enum RpcError {
|
||||||
/// The client disconnected from the server.
|
/// The client disconnected from the server.
|
||||||
#[error("the client disconnected from the server")]
|
#[error("the client disconnected from the server")]
|
||||||
@@ -291,6 +289,9 @@ where
|
|||||||
/// Could not flush the transport.
|
/// Could not flush the transport.
|
||||||
#[error("could not flush the transport")]
|
#[error("could not flush the transport")]
|
||||||
Flush(#[source] E),
|
Flush(#[source] E),
|
||||||
|
/// Could not close the write end of the transport.
|
||||||
|
#[error("could not close the write end of the transport")]
|
||||||
|
Close(#[source] E),
|
||||||
/// Could not poll expired requests.
|
/// Could not poll expired requests.
|
||||||
#[error("could not poll expired requests")]
|
#[error("could not poll expired requests")]
|
||||||
Timer(#[source] tokio::time::error::Error),
|
Timer(#[source] tokio::time::error::Error),
|
||||||
@@ -335,6 +336,15 @@ where
|
|||||||
.map_err(ChannelError::Flush)
|
.map_err(ChannelError::Flush)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_close<'a>(
|
||||||
|
self: &'a mut Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), ChannelError<C::Error>>> {
|
||||||
|
self.transport_pin_mut()
|
||||||
|
.poll_close(cx)
|
||||||
|
.map_err(ChannelError::Close)
|
||||||
|
}
|
||||||
|
|
||||||
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
|
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
|
||||||
self.as_mut().project().canceled_requests
|
self.as_mut().project().canceled_requests
|
||||||
}
|
}
|
||||||
@@ -394,7 +404,7 @@ where
|
|||||||
|
|
||||||
match (pending_requests_status, canceled_requests_status) {
|
match (pending_requests_status, canceled_requests_status) {
|
||||||
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
|
||||||
ready!(self.poll_flush(cx)?);
|
ready!(self.poll_close(cx)?);
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {
|
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {
|
||||||
@@ -510,11 +520,7 @@ where
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
self.start_send(request)?;
|
self.start_send(request)?;
|
||||||
let deadline = ctx.deadline;
|
tracing::info!("SendRequest");
|
||||||
tracing::info!(
|
|
||||||
tarpc.deadline = %humantime::format_rfc3339(deadline),
|
|
||||||
"SendRequest"
|
|
||||||
);
|
|
||||||
drop(entered);
|
drop(entered);
|
||||||
|
|
||||||
self.in_flight_requests()
|
self.in_flight_requests()
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ pub struct Context {
|
|||||||
/// When the client expects the request to be complete by. The server should cancel the request
|
/// When the client expects the request to be complete by. The server should cancel the request
|
||||||
/// if it is not complete by this time.
|
/// if it is not complete by this time.
|
||||||
#[cfg_attr(feature = "serde1", serde(default = "ten_seconds_from_now"))]
|
#[cfg_attr(feature = "serde1", serde(default = "ten_seconds_from_now"))]
|
||||||
|
// Serialized as a Duration to prevent clock skew issues.
|
||||||
|
#[cfg_attr(feature = "serde1", serde(with = "absolute_to_relative_time"))]
|
||||||
pub deadline: SystemTime,
|
pub deadline: SystemTime,
|
||||||
/// Uniquely identifies requests originating from the same source.
|
/// Uniquely identifies requests originating from the same source.
|
||||||
/// When a service handles a request by making requests itself, those requests should
|
/// When a service handles a request by making requests itself, those requests should
|
||||||
@@ -36,6 +38,54 @@ pub struct Context {
|
|||||||
pub trace_context: trace::Context,
|
pub trace_context: trace::Context,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde1")]
|
||||||
|
mod absolute_to_relative_time {
|
||||||
|
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
|
pub use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
pub fn serialize<S>(deadline: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let deadline = deadline
|
||||||
|
.duration_since(SystemTime::now())
|
||||||
|
.unwrap_or(Duration::ZERO);
|
||||||
|
deadline.serialize(serializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let deadline = Duration::deserialize(deserializer)?;
|
||||||
|
Ok(SystemTime::now() + deadline)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[derive(serde::Serialize, serde::Deserialize)]
|
||||||
|
struct AbsoluteToRelative(#[serde(with = "self")] SystemTime);
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_serialize() {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let deadline = now + Duration::from_secs(10);
|
||||||
|
let serialized_deadline = bincode::serialize(&AbsoluteToRelative(deadline)).unwrap();
|
||||||
|
let deserialized_deadline: Duration = bincode::deserialize(&serialized_deadline).unwrap();
|
||||||
|
// TODO: how to avoid flakiness?
|
||||||
|
assert!(deserialized_deadline > Duration::from_secs(9));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deserialize() {
|
||||||
|
let deadline = Duration::from_secs(10);
|
||||||
|
let serialized_deadline = bincode::serialize(&deadline).unwrap();
|
||||||
|
let AbsoluteToRelative(deserialized_deadline) =
|
||||||
|
bincode::deserialize(&serialized_deadline).unwrap();
|
||||||
|
// TODO: how to avoid flakiness?
|
||||||
|
assert!(deserialized_deadline > SystemTime::now() + Duration::from_secs(9));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assert_impl_all!(Context: Send, Sync);
|
assert_impl_all!(Context: Send, Sync);
|
||||||
|
|
||||||
fn ten_seconds_from_now() -> SystemTime {
|
fn ten_seconds_from_now() -> SystemTime {
|
||||||
|
|||||||
@@ -27,7 +27,7 @@
|
|||||||
//! process, and no context switching between different languages.
|
//! process, and no context switching between different languages.
|
||||||
//!
|
//!
|
||||||
//! Some other features of tarpc:
|
//! Some other features of tarpc:
|
||||||
//! - Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
|
//! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
|
||||||
//! used as a transport to connect the client and server.
|
//! used as a transport to connect the client and server.
|
||||||
//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
|
||||||
//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
|
//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
|
||||||
@@ -42,7 +42,7 @@
|
|||||||
//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
|
//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
|
||||||
//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
|
//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
|
||||||
//! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
|
//! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
|
||||||
//! each RPC can be traced through the client, server, amd other dependencies downstream of the
|
//! each RPC can be traced through the client, server, and other dependencies downstream of the
|
||||||
//! server. Even for applications not connected to a distributed tracing collector, the
|
//! server. Even for applications not connected to a distributed tracing collector, the
|
||||||
//! instrumentation can also be ingested by regular loggers like
|
//! instrumentation can also be ingested by regular loggers like
|
||||||
//! [env_logger](https://github.com/env-logger-rs/env_logger/).
|
//! [env_logger](https://github.com/env-logger-rs/env_logger/).
|
||||||
@@ -54,7 +54,7 @@
|
|||||||
//! Add to your `Cargo.toml` dependencies:
|
//! Add to your `Cargo.toml` dependencies:
|
||||||
//!
|
//!
|
||||||
//! ```toml
|
//! ```toml
|
||||||
//! tarpc = "0.27"
|
//! tarpc = "0.29"
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
|
||||||
@@ -69,7 +69,7 @@
|
|||||||
//! ```toml
|
//! ```toml
|
||||||
//! anyhow = "1.0"
|
//! anyhow = "1.0"
|
||||||
//! futures = "0.3"
|
//! futures = "0.3"
|
||||||
//! tarpc = { version = "0.27", features = ["tokio1"] }
|
//! tarpc = { version = "0.29", features = ["tokio1"] }
|
||||||
//! tokio = { version = "1.0", features = ["macros"] }
|
//! tokio = { version = "1.0", features = ["macros"] }
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
@@ -88,7 +88,7 @@
|
|||||||
//! };
|
//! };
|
||||||
//! use tarpc::{
|
//! use tarpc::{
|
||||||
//! client, context,
|
//! client, context,
|
||||||
//! server::{self, incoming::Incoming},
|
//! server::{self, incoming::Incoming, Channel},
|
||||||
//! };
|
//! };
|
||||||
//!
|
//!
|
||||||
//! // This is the service definition. It looks a lot like a trait definition.
|
//! // This is the service definition. It looks a lot like a trait definition.
|
||||||
@@ -132,7 +132,7 @@
|
|||||||
//! type HelloFut = Ready<String>;
|
//! type HelloFut = Ready<String>;
|
||||||
//!
|
//!
|
||||||
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||||
//! future::ready(format!("Hello, {}!", name))
|
//! future::ready(format!("Hello, {name}!"))
|
||||||
//! }
|
//! }
|
||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
@@ -168,7 +168,7 @@
|
|||||||
//! # // an associated type representing the future output by the fn.
|
//! # // an associated type representing the future output by the fn.
|
||||||
//! # type HelloFut = Ready<String>;
|
//! # type HelloFut = Ready<String>;
|
||||||
//! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
//! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
|
||||||
//! # future::ready(format!("Hello, {}!", name))
|
//! # future::ready(format!("Hello, {name}!"))
|
||||||
//! # }
|
//! # }
|
||||||
//! # }
|
//! # }
|
||||||
//! # #[cfg(not(feature = "tokio1"))]
|
//! # #[cfg(not(feature = "tokio1"))]
|
||||||
@@ -190,7 +190,7 @@
|
|||||||
//! // specifies a deadline and trace information which can be helpful in debugging requests.
|
//! // specifies a deadline and trace information which can be helpful in debugging requests.
|
||||||
//! let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
//! let hello = client.hello(context::current(), "Stim".to_string()).await?;
|
||||||
//!
|
//!
|
||||||
//! println!("{}", hello);
|
//! println!("{hello}");
|
||||||
//!
|
//!
|
||||||
//! Ok(())
|
//! Ok(())
|
||||||
//! }
|
//! }
|
||||||
@@ -264,7 +264,7 @@ pub use tarpc_plugins::service;
|
|||||||
/// #[tarpc::server]
|
/// #[tarpc::server]
|
||||||
/// impl World for HelloServer {
|
/// impl World for HelloServer {
|
||||||
/// async fn hello(self, _: context::Context, name: String) -> String {
|
/// async fn hello(self, _: context::Context, name: String) -> String {
|
||||||
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
|
/// format!("Hello, {name}! You are connected from {:?}.", self.0)
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
@@ -290,7 +290,7 @@ pub use tarpc_plugins::service;
|
|||||||
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
|
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
|
||||||
/// + Send>> {
|
/// + Send>> {
|
||||||
/// Box::pin(async move {
|
/// Box::pin(async move {
|
||||||
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
|
/// format!("Hello, {name}! You are connected from {:?}.", self.0)
|
||||||
/// })
|
/// })
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
|
|||||||
@@ -161,6 +161,7 @@ where
|
|||||||
let span = info_span!(
|
let span = info_span!(
|
||||||
"RPC",
|
"RPC",
|
||||||
rpc.trace_id = %request.context.trace_id(),
|
rpc.trace_id = %request.context.trace_id(),
|
||||||
|
rpc.deadline = %humantime::format_rfc3339(request.context.deadline),
|
||||||
otel.kind = "server",
|
otel.kind = "server",
|
||||||
otel.name = tracing::field::Empty,
|
otel.name = tracing::field::Empty,
|
||||||
);
|
);
|
||||||
@@ -333,6 +334,7 @@ where
|
|||||||
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
|
type Item = Result<TrackedRequest<Req>, ChannelError<T::Error>>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
#[derive(Debug)]
|
||||||
enum ReceiverStatus {
|
enum ReceiverStatus {
|
||||||
Ready,
|
Ready,
|
||||||
Pending,
|
Pending,
|
||||||
@@ -388,6 +390,11 @@ where
|
|||||||
Poll::Pending => Pending,
|
Poll::Pending => Pending,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
tracing::trace!(
|
||||||
|
"Expired requests: {:?}, Inbound: {:?}",
|
||||||
|
expiration_status,
|
||||||
|
request_status
|
||||||
|
);
|
||||||
match (expiration_status, request_status) {
|
match (expiration_status, request_status) {
|
||||||
(Ready, _) | (_, Ready) => continue,
|
(Ready, _) | (_, Ready) => continue,
|
||||||
(Closed, Closed) => return Poll::Ready(None),
|
(Closed, Closed) => return Poll::Ready(None),
|
||||||
|
|||||||
@@ -98,6 +98,11 @@ impl InFlightRequests {
|
|||||||
&mut self,
|
&mut self,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
) -> Poll<Option<Result<u64, tokio::time::error::Error>>> {
|
||||||
|
if self.deadlines.is_empty() {
|
||||||
|
// TODO(https://github.com/tokio-rs/tokio/issues/4161)
|
||||||
|
// This is a workaround for DelayQueue not always treating this case correctly.
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
self.deadlines.poll_expired(cx).map_ok(|expired| {
|
||||||
if let Some(RequestData {
|
if let Some(RequestData {
|
||||||
abort_handle, span, ..
|
abort_handle, span, ..
|
||||||
@@ -184,12 +189,31 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn remove_request_doesnt_abort() {
|
async fn remove_request_doesnt_abort() {
|
||||||
let mut in_flight_requests = InFlightRequests::default();
|
let mut in_flight_requests = InFlightRequests::default();
|
||||||
|
assert!(in_flight_requests.deadlines.is_empty());
|
||||||
|
|
||||||
let abort_registration = in_flight_requests
|
let abort_registration = in_flight_requests
|
||||||
.start_request(0, SystemTime::now(), Span::current())
|
.start_request(
|
||||||
|
0,
|
||||||
|
SystemTime::now() + std::time::Duration::from_secs(10),
|
||||||
|
Span::current(),
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
let mut abortable_future = Box::new(Abortable::new(pending::<()>(), abort_registration));
|
||||||
|
|
||||||
|
// Precondition: Pending expiration
|
||||||
|
assert_matches!(
|
||||||
|
in_flight_requests.poll_expired(&mut noop_context()),
|
||||||
|
Poll::Pending
|
||||||
|
);
|
||||||
|
assert!(!in_flight_requests.deadlines.is_empty());
|
||||||
|
|
||||||
assert_matches!(in_flight_requests.remove_request(0), Some(_));
|
assert_matches!(in_flight_requests.remove_request(0), Some(_));
|
||||||
|
// Postcondition: No pending expirations
|
||||||
|
assert!(in_flight_requests.deadlines.is_empty());
|
||||||
|
assert_matches!(
|
||||||
|
in_flight_requests.poll_expired(&mut noop_context()),
|
||||||
|
Poll::Ready(None)
|
||||||
|
);
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
abortable_future.poll_unpin(&mut noop_context()),
|
abortable_future.poll_unpin(&mut noop_context()),
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
|||||||
@@ -138,25 +138,25 @@ impl From<u64> for SpanId {
|
|||||||
|
|
||||||
impl From<opentelemetry::trace::TraceId> for TraceId {
|
impl From<opentelemetry::trace::TraceId> for TraceId {
|
||||||
fn from(trace_id: opentelemetry::trace::TraceId) -> Self {
|
fn from(trace_id: opentelemetry::trace::TraceId) -> Self {
|
||||||
Self::from(trace_id.to_u128())
|
Self::from(u128::from_be_bytes(trace_id.to_bytes()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<TraceId> for opentelemetry::trace::TraceId {
|
impl From<TraceId> for opentelemetry::trace::TraceId {
|
||||||
fn from(trace_id: TraceId) -> Self {
|
fn from(trace_id: TraceId) -> Self {
|
||||||
Self::from_u128(trace_id.into())
|
Self::from_bytes(u128::from(trace_id).to_be_bytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<opentelemetry::trace::SpanId> for SpanId {
|
impl From<opentelemetry::trace::SpanId> for SpanId {
|
||||||
fn from(span_id: opentelemetry::trace::SpanId) -> Self {
|
fn from(span_id: opentelemetry::trace::SpanId) -> Self {
|
||||||
Self::from(span_id.to_u64())
|
Self::from(u64::from_be_bytes(span_id.to_bytes()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<SpanId> for opentelemetry::trace::SpanId {
|
impl From<SpanId> for opentelemetry::trace::SpanId {
|
||||||
fn from(span_id: SpanId) -> Self {
|
fn from(span_id: SpanId) -> Self {
|
||||||
Self::from_u64(span_id.0)
|
Self::from_bytes(u64::from(span_id).to_be_bytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -181,7 +181,7 @@ mod tests {
|
|||||||
future::ready(request.parse::<u64>().map_err(|_| {
|
future::ready(request.parse::<u64>().map_err(|_| {
|
||||||
io::Error::new(
|
io::Error::new(
|
||||||
io::ErrorKind::InvalidInput,
|
io::ErrorKind::InvalidInput,
|
||||||
format!("{:?} is not an int", request),
|
format!("{request:?} is not an int"),
|
||||||
)
|
)
|
||||||
}))
|
}))
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -38,11 +38,34 @@ where
|
|||||||
H: BuildHasher,
|
H: BuildHasher,
|
||||||
{
|
{
|
||||||
fn compact(&mut self, usage_ratio_threshold: f64) {
|
fn compact(&mut self, usage_ratio_threshold: f64) {
|
||||||
if self.capacity() > 1000 {
|
let usage_ratio_threshold = usage_ratio_threshold.clamp(f64::MIN_POSITIVE, 1.);
|
||||||
let usage_ratio = self.len() as f64 / self.capacity() as f64;
|
let cap = f64::max(1000., self.len() as f64 / usage_ratio_threshold);
|
||||||
if usage_ratio < usage_ratio_threshold {
|
self.shrink_to(cap as usize);
|
||||||
self.shrink_to_fit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_compact() {
|
||||||
|
let mut map = HashMap::with_capacity(2048);
|
||||||
|
assert_eq!(map.capacity(), 3584);
|
||||||
|
|
||||||
|
// Make usage ratio 25%
|
||||||
|
for i in 0..896 {
|
||||||
|
map.insert(format!("k{i}"), "v");
|
||||||
|
}
|
||||||
|
|
||||||
|
map.compact(-1.0);
|
||||||
|
assert_eq!(map.capacity(), 3584);
|
||||||
|
|
||||||
|
map.compact(0.25);
|
||||||
|
assert_eq!(map.capacity(), 3584);
|
||||||
|
|
||||||
|
map.compact(0.50);
|
||||||
|
assert_eq!(map.capacity(), 1792);
|
||||||
|
|
||||||
|
map.compact(1.0);
|
||||||
|
assert_eq!(map.capacity(), 1792);
|
||||||
|
|
||||||
|
map.compact(2.0);
|
||||||
|
assert_eq!(map.capacity(), 1792);
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ struct HelloServer;
|
|||||||
|
|
||||||
#[tarpc::server]
|
#[tarpc::server]
|
||||||
impl World for HelloServer {
|
impl World for HelloServer {
|
||||||
fn hello(name: String) -> String {
|
fn hello(name: String) -> String {
|
||||||
format!("Hello, {}!", name)
|
format!("Hello, {name}!", name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,5 +7,5 @@ error: not all trait items implemented, missing: `HelloFut`
|
|||||||
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
|
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
|
||||||
--> $DIR/tarpc_server_missing_async.rs:10:5
|
--> $DIR/tarpc_server_missing_async.rs:10:5
|
||||||
|
|
|
|
||||||
10 | fn hello(name: String) -> String {
|
10 | fn hello(name: String) -> String {
|
||||||
| ^^
|
| ^^
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ impl Service for Server {
|
|||||||
type HeyFut = Ready<String>;
|
type HeyFut = Ready<String>;
|
||||||
|
|
||||||
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
|
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
|
||||||
ready(format!("Hey, {}.", name))
|
ready(format!("Hey, {name}."))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user