19 Commits

Author SHA1 Message Date
Tim Kuehn
3bf247ae40 Prepare release of 0.29.0 2022-05-26 16:34:08 -07:00
Tim Kuehn
69442d2368 Serialize RPC deadline as a Duration.
Duration was previously serialized as SystemTime. However, absolute
times run into problems with clock skew: if the remote machine's clock
is too far in the future, the RPC deadline will be exceeded before
request processing can begin. Conversely, if the remote machine's clock
is too far in the past, the RPC deadline will not be enforced.

By converting the absolute deadline to a relative duration, clock skew
is no longer relevant, as the remote machine will convert the deadline
into a time relative to its own clock. This mirrors how the gRPC HTTP2
protocol includes a Timeout in the request headers [0] but the SDK uses
timestamps [1]. Keeping the absolute time in the core APIs maintains all
the benefits of today, namely, natural deadline propagation between RPC
hops when using the current context.

This serialization strategy means that, generally, the remote machine's
deadline will be slightly in the future compared to the local machine.
Depending on network transfer latencies, this could be microseconds to
milliseconds, or worse in the worst case. Because the deadline is not
intended for high-precision scenarios, I don't view this is as
problematic.

Because this change only affects the serialization layer, local
transports that bypass serialization are not affected.

[0] https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
[1] https://grpc.io/blog/deadlines/#setting-a-deadline
2022-05-26 14:14:49 -07:00
Tim Kuehn
e135e39504 Add rpc.deadline tag to Opentelemetry traces. 2022-05-26 13:53:03 -07:00
Tim Kuehn
a3a6404a30 Prepare release of 0.28.0 2022-04-06 22:07:07 -07:00
Tim Kuehn
b36eac80b1 Bump minimum rust version to 1.58.0 2022-04-06 21:53:56 -07:00
Bruno
d7070e4bc3 Update opentelemetry and related dependencies (#362) 2022-04-03 14:09:14 -07:00
Tim Kuehn
b5d1828308 Use captured identifiers in format strings.
This was stabilized in Rust 1.58.0: https://blog.rust-lang.org/2022/01/13/Rust-1.58.0.html
2022-01-13 15:00:44 -08:00
Zak Cutner
92cfe63c4f Use single-threaded Tokio runtime (#360) 2022-01-06 20:59:51 -08:00
Tim Kuehn
839a2f067c Update example to latest version of Clap 2021-12-27 22:56:17 -08:00
David Kleingeld
b5d593488c Derive more traits for RpcError (#359)
Makes RpcError derive Clone, PartialEq, Eq, Hash, Serialize and Deserialize.
2021-12-27 22:00:58 -08:00
Tim Kuehn
eea38b8bf4 Simplify code with const assert!.
The code that prevents compilation on systems where usize is larger than
u64 previously used a const index-out-of-bounds trick. That code can now
be replaced with assert!, as const panic! has landed in 1.57.0 stable.
2021-12-03 15:20:33 -08:00
Shi Yan
70493c15f4 Fix a compiling issue of the official example (#358)
Fix a compiling issue of the official example because of the following error :

```
error[E0599]: the method `execute` exists for struct `BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>`, but its trait bounds were not satisfied
  --> src/main.rs:39:25
   |
39 |     tokio::spawn(server.execute(HelloServer.serve()));
   |                         ^^^^^^^ method cannot be called on `BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>` due to unsatisfied trait bounds
   |
   = note: the following trait bounds were not satisfied:
           `<&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>> as futures::Stream>::Item = _`
           which is required by `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: tarpc::server::incoming::Incoming<_>`
           `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: futures::Stream`
           which is required by `&BaseChannel<_, _, UnboundedChannel<ClientMessage<_>, Response<_>>>: tarpc::server::incoming::Incoming<_>`
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope; perhaps add a `use` for it:
   |
1  | use tarpc::server::Channel;
   |
```

See https://github.com/google/tarpc/pull/358#issuecomment-981953193 for the root cause.
2021-11-29 17:01:16 -08:00
baptiste0928
f7c5d6a7c3 Fix example-service (#355)
Fixes the compilation of the example-service crate (the Clap trait has been renamed Parser in clap-rs/clap@d840d56).
2021-11-15 08:39:04 -08:00
Scott Kirkpatrick
98c5d2a18b Re-add typo fixes (#353)
The typo fixes that were added by commit b5d9aaa
were accidentally reverted by commit 1e680e3, this
will add them back
2021-11-08 10:07:21 -08:00
Tim Kuehn
46b534f7c6 Use HashMap::shrink_to in impl of Comapct::compact. 2021-10-21 17:03:57 -07:00
Tim Kuehn
42b4fc52b1 Set rust-version to 1.56 2021-10-21 16:08:15 -07:00
Tim Kuehn
350dbcdad0 Upgrade to Rust 2021! 2021-10-21 14:10:21 -07:00
Tim Kuehn
b1b4461d89 Prepare release of 0.27.2 2021-10-08 22:31:56 -07:00
Tim Kuehn
f694b7573a Close TcpStream when client disconnects.
An attempt at a clean shutdown helps the server to drop its connections
more quickly.

Testing this uncovered a latent bug in DelayQueue wherein `poll_expired`
yields `Pending` when empty. A workaround was added to
`InFlightRequests::poll_expired`: check if there are actually any
outstanding requests before calling `DelayQueue::poll_expired`.
2021-10-08 22:13:24 -07:00
22 changed files with 188 additions and 81 deletions

View File

@@ -40,7 +40,7 @@ rather than in a separate language such as .proto. This means there's no separat
process, and no context switching between different languages.
Some other features of tarpc:
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
- Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
used as a transport to connect the client and server.
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
- Cascading cancellation: dropping a request will send a cancellation message to the server.
@@ -55,7 +55,7 @@ Some other features of tarpc:
[tracing](https://github.com/tokio-rs/tracing) primitives extended with
[OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
each RPC can be traced through the client, server, amd other dependencies downstream of the
each RPC can be traced through the client, server, and other dependencies downstream of the
server. Even for applications not connected to a distributed tracing collector, the
instrumentation can also be ingested by regular loggers like
[env_logger](https://github.com/env-logger-rs/env_logger/).
@@ -67,7 +67,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.27"
tarpc = "0.29"
```
The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -82,7 +82,7 @@ your `Cargo.toml`:
```toml
anyhow = "1.0"
futures = "0.3"
tarpc = { version = "0.27", features = ["tokio1"] }
tarpc = { version = "0.29", features = ["tokio1"] }
tokio = { version = "1.0", features = ["macros"] }
```
@@ -100,7 +100,7 @@ use futures::{
};
use tarpc::{
client, context,
server::{self, incoming::Incoming},
server::{self, incoming::Incoming, Channel},
};
// This is the service definition. It looks a lot like a trait definition.
@@ -128,7 +128,7 @@ impl World for HelloServer {
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
future::ready(format!("Hello, {name}!"))
}
}
```
@@ -155,7 +155,7 @@ async fn main() -> anyhow::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello);
println!("{hello}");
Ok(())
}

View File

@@ -1,8 +1,30 @@
## 0.29.0 (2022-05-26)
### Breaking Changes
`Context.deadline` is now serialized as a Duration. This prevents clock skew from affecting deadline
behavior. For more details see https://github.com/google/tarpc/pull/367 and its [related
issue](https://github.com/google/tarpc/issues/366).
## 0.28.0 (2022-04-06)
### Breaking Changes
- The minimum supported Rust version has increased to 1.58.0.
- The version of opentelemetry depended on by tarpc has increased to 0.17.0.
## 0.27.2 (2021-10-08)
### Fixes
Clients will now close their transport before dropping it. An attempt at a clean shutdown can help
the server drop its connections more quickly.
## 0.27.1 (2021-09-22)
### Breaking Changes
### RPC error type is changing
#### RPC error type is changing
RPC return types are changing from `Result<Response, io::Error>` to `Result<Response,
tarpc::client::RpcError>`.

View File

@@ -1,8 +1,9 @@
[package]
name = "tarpc-example-service"
version = "0.10.0"
version = "0.11.0"
rust-version = "1.56"
authors = ["Tim Kuehn <tikue@google.com>"]
edition = "2018"
edition = "2021"
license = "MIT"
documentation = "https://docs.rs/tarpc-example-service"
homepage = "https://github.com/google/tarpc"
@@ -14,13 +15,13 @@ description = "An example server built on tarpc."
[dependencies]
anyhow = "1.0"
clap = "3.0.0-beta.2"
clap = { version = "3.0.0-rc.9", features = ["derive"] }
log = "0.4"
futures = "0.3"
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
rand = "0.8"
tarpc = { version = "0.27", path = "../tarpc", features = ["full"] }
tarpc = { version = "0.29", path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tracing = { version = "0.1" }
tracing-opentelemetry = "0.15"

View File

@@ -4,14 +4,14 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use clap::Clap;
use clap::Parser;
use service::{init_tracing, WorldClient};
use std::{net::SocketAddr, time::Duration};
use tarpc::{client, context, tokio_serde::formats::Json};
use tokio::time::sleep;
use tracing::Instrument;
#[derive(Clap)]
#[derive(Parser)]
struct Flags {
/// Sets the server address to connect to.
#[clap(long)]

View File

@@ -4,7 +4,7 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use clap::Clap;
use clap::Parser;
use futures::{future, prelude::*};
use rand::{
distributions::{Distribution, Uniform},
@@ -22,7 +22,7 @@ use tarpc::{
};
use tokio::time;
#[derive(Clap)]
#[derive(Parser)]
struct Flags {
/// Sets the port number to listen on.
#[clap(long)]
@@ -40,7 +40,7 @@ impl World for HelloServer {
let sleep_time =
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
time::sleep(sleep_time).await;
format!("Hello, {}! You are connected from {}", name, self.0)
format!("Hello, {name}! You are connected from {}", self.0)
}
}

View File

@@ -1,8 +1,9 @@
[package]
name = "tarpc-plugins"
version = "0.12.0"
rust-version = "1.56"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
edition = "2021"
license = "MIT"
documentation = "https://docs.rs/tarpc-plugins"
homepage = "https://github.com/google/tarpc"

View File

@@ -83,7 +83,7 @@ impl Parse for Service {
ident_errors,
syn::Error::new(
rpc.ident.span(),
format!("method name conflicts with generated fn `{}::serve`", ident)
format!("method name conflicts with generated fn `{ident}::serve`")
)
);
}
@@ -270,7 +270,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>();
let request_names = methods
.iter()
.map(|m| format!("{}.{}", ident, m))
.map(|m| format!("{ident}.{m}"))
.collect::<Vec<_>>();
ServiceGenerator {
@@ -306,7 +306,7 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
.collect::<Vec<_>>(),
future_types: &camel_case_fn_names
.iter()
.map(|name| parse_str(&format!("{}Fut", name)).unwrap())
.map(|name| parse_str(&format!("{name}Fut")).unwrap())
.collect::<Vec<_>>(),
derive_serialize: derive_serialize.as_ref(),
}
@@ -409,7 +409,7 @@ fn verify_types_were_provided(
if !provided.iter().any(|typedecl| typedecl.ident == expected) {
let mut e = syn::Error::new(
span,
format!("not all trait items implemented, missing: `{}`", expected),
format!("not all trait items implemented, missing: `{expected}`"),
);
let fn_span = method.sig.fn_token.span();
e.extend(syn::Error::new(
@@ -479,7 +479,7 @@ impl<'a> ServiceGenerator<'a> {
),
output,
)| {
let ty_doc = format!("The response future returned by [`{}::{}`].", service_ident, ident);
let ty_doc = format!("The response future returned by [`{service_ident}::{ident}`].");
quote! {
#[doc = #ty_doc]
type #future_type: std::future::Future<Output = #output>;

View File

@@ -1,8 +1,12 @@
[package]
name = "tarpc"
version = "0.27.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018"
version = "0.29.0"
rust-version = "1.58.0"
authors = [
"Adam Wright <adam.austin.wright@gmail.com>",
"Tim Kuehn <timothy.j.kuehn@gmail.com>",
]
edition = "2021"
license = "MIT"
documentation = "https://docs.rs/tarpc"
homepage = "https://github.com/google/tarpc"
@@ -16,13 +20,20 @@ description = "An RPC framework for Rust with a focus on ease of use."
default = []
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
tokio1 = ["tokio/rt-multi-thread"]
tokio1 = ["tokio/rt"]
serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
serde-transport-json = ["tokio-serde/json"]
serde-transport-bincode = ["tokio-serde/bincode"]
tcp = ["tokio/net"]
full = ["serde1", "tokio1", "serde-transport", "serde-transport-json", "serde-transport-bincode", "tcp"]
full = [
"serde1",
"tokio1",
"serde-transport",
"serde-transport-json",
"serde-transport-bincode",
"tcp",
]
[badges]
travis-ci = { repository = "google/tarpc" }
@@ -39,11 +50,14 @@ static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.12" }
thiserror = "1.0"
tokio = { version = "1", features = ["time"] }
tokio-util = { version = "0.6.3", features = ["time"] }
tokio-util = { version = "0.6.9", features = ["time"] }
tokio-serde = { optional = true, version = "0.8" }
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
tracing-opentelemetry = { version = "0.15", default-features = false }
opentelemetry = { version = "0.16", default-features = false }
tracing = { version = "0.1", default-features = false, features = [
"attributes",
"log",
] }
tracing-opentelemetry = { version = "0.17.2", default-features = false }
opentelemetry = { version = "0.17.0", default-features = false }
[dev-dependencies]
@@ -52,11 +66,13 @@ bincode = "1.3"
bytes = { version = "1", features = ["serde"] }
flate2 = "1.0"
futures-test = "0.3"
opentelemetry = { version = "0.16", default-features = false, features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
opentelemetry = { version = "0.17.0", default-features = false, features = [
"rt-tokio",
] }
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] }
pin-utils = "0.1.0-alpha"
serde_bytes = "0.11"
tracing-subscriber = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
trybuild = "1.0"

View File

@@ -54,7 +54,7 @@ where
if algorithm != CompressionAlgorithm::Deflate {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Compression algorithm {:?} not supported", algorithm),
format!("Compression algorithm {algorithm:?} not supported"),
));
}
let mut deflater = DeflateDecoder::new(payload.as_slice());
@@ -102,7 +102,7 @@ struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
format!("Hey, {}!", name)
format!("Hey, {name}!")
}
}

View File

@@ -290,7 +290,7 @@ fn init_tracing(service_name: &str) -> anyhow::Result<()> {
.install_batch(opentelemetry::runtime::Tokio)?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::filter::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()?;

View File

@@ -29,7 +29,7 @@ impl World for HelloServer {
type HelloFut = Ready<String>;
fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
future::ready(format!("Hello, {}!", name))
future::ready(format!("Hello, {name}!"))
}
}
@@ -49,7 +49,7 @@ async fn main() -> anyhow::Result<()> {
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
println!("{}", hello);
println!("{hello}");
Ok(())
}

View File

@@ -6,7 +6,6 @@
use crate::{add::Add as AddService, double::Double as DoubleService};
use futures::{future, prelude::*};
use std::env;
use tarpc::{
client, context,
server::{incoming::Incoming, BaseChannel},
@@ -56,9 +55,9 @@ impl DoubleService for DoubleServer {
}
fn init_tracing(service_name: &str) -> anyhow::Result<()> {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name(service_name)
.with_auto_split_batch(true)
.with_max_packet_size(2usize.pow(13))
.install_batch(opentelemetry::runtime::Tokio)?;

View File

@@ -81,14 +81,10 @@ impl<C, D> fmt::Debug for NewClient<C, D> {
}
}
#[allow(dead_code)]
#[allow(clippy::no_effect)]
const CHECK_USIZE: () = {
if std::mem::size_of::<usize>() > std::mem::size_of::<u64>() {
// TODO: replace this with panic!() as soon as RFC 2345 gets stabilized
["usize is too big to fit in u64"][42];
}
};
const _CHECK_USIZE: () = assert!(
std::mem::size_of::<usize>() <= std::mem::size_of::<u64>(),
"usize is too big to fit in u64"
);
/// Handles communication from the client to request dispatch.
#[derive(Debug)]
@@ -118,6 +114,7 @@ impl<Req, Resp> Channel<Req, Resp> {
skip(self, ctx, request_name, request),
fields(
rpc.trace_id = tracing::field::Empty,
rpc.deadline = %humantime::format_rfc3339(ctx.deadline),
otel.kind = "client",
otel.name = request_name)
)]
@@ -172,7 +169,8 @@ struct ResponseGuard<'a, Resp> {
/// An error that can occur in the processing of an RPC. This is not request-specific errors but
/// rather cross-cutting errors that can always occur.
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum RpcError {
/// The client disconnected from the server.
#[error("the client disconnected from the server")]
@@ -522,11 +520,7 @@ where
},
});
self.start_send(request)?;
let deadline = ctx.deadline;
tracing::info!(
tarpc.deadline = %humantime::format_rfc3339(deadline),
"SendRequest"
);
tracing::info!("SendRequest");
drop(entered);
self.in_flight_requests()

View File

@@ -28,6 +28,8 @@ pub struct Context {
/// When the client expects the request to be complete by. The server should cancel the request
/// if it is not complete by this time.
#[cfg_attr(feature = "serde1", serde(default = "ten_seconds_from_now"))]
// Serialized as a Duration to prevent clock skew issues.
#[cfg_attr(feature = "serde1", serde(with = "absolute_to_relative_time"))]
pub deadline: SystemTime,
/// Uniquely identifies requests originating from the same source.
/// When a service handles a request by making requests itself, those requests should
@@ -36,6 +38,54 @@ pub struct Context {
pub trace_context: trace::Context,
}
#[cfg(feature = "serde1")]
mod absolute_to_relative_time {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use std::time::{Duration, SystemTime};
pub fn serialize<S>(deadline: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let deadline = deadline
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
deadline.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let deadline = Duration::deserialize(deserializer)?;
Ok(SystemTime::now() + deadline)
}
#[cfg(test)]
#[derive(serde::Serialize, serde::Deserialize)]
struct AbsoluteToRelative(#[serde(with = "self")] SystemTime);
#[test]
fn test_serialize() {
let now = SystemTime::now();
let deadline = now + Duration::from_secs(10);
let serialized_deadline = bincode::serialize(&AbsoluteToRelative(deadline)).unwrap();
let deserialized_deadline: Duration = bincode::deserialize(&serialized_deadline).unwrap();
// TODO: how to avoid flakiness?
assert!(deserialized_deadline > Duration::from_secs(9));
}
#[test]
fn test_deserialize() {
let deadline = Duration::from_secs(10);
let serialized_deadline = bincode::serialize(&deadline).unwrap();
let AbsoluteToRelative(deserialized_deadline) =
bincode::deserialize(&serialized_deadline).unwrap();
// TODO: how to avoid flakiness?
assert!(deserialized_deadline > SystemTime::now() + Duration::from_secs(9));
}
}
assert_impl_all!(Context: Send, Sync);
fn ten_seconds_from_now() -> SystemTime {

View File

@@ -27,7 +27,7 @@
//! process, and no context switching between different languages.
//!
//! Some other features of tarpc:
//! - Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
//! - Pluggable transport: any type implementing `Stream<Item = Request> + Sink<Response>` can be
//! used as a transport to connect the client and server.
//! - `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
//! - Cascading cancellation: dropping a request will send a cancellation message to the server.
@@ -42,7 +42,7 @@
//! [tracing](https://github.com/tokio-rs/tracing) primitives extended with
//! [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible tracing subscriber like
//! [Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
//! each RPC can be traced through the client, server, amd other dependencies downstream of the
//! each RPC can be traced through the client, server, and other dependencies downstream of the
//! server. Even for applications not connected to a distributed tracing collector, the
//! instrumentation can also be ingested by regular loggers like
//! [env_logger](https://github.com/env-logger-rs/env_logger/).
@@ -54,7 +54,7 @@
//! Add to your `Cargo.toml` dependencies:
//!
//! ```toml
//! tarpc = "0.27"
//! tarpc = "0.29"
//! ```
//!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -69,7 +69,7 @@
//! ```toml
//! anyhow = "1.0"
//! futures = "0.3"
//! tarpc = { version = "0.27", features = ["tokio1"] }
//! tarpc = { version = "0.29", features = ["tokio1"] }
//! tokio = { version = "1.0", features = ["macros"] }
//! ```
//!
@@ -88,7 +88,7 @@
//! };
//! use tarpc::{
//! client, context,
//! server::{self, incoming::Incoming},
//! server::{self, incoming::Incoming, Channel},
//! };
//!
//! // This is the service definition. It looks a lot like a trait definition.
@@ -132,7 +132,7 @@
//! type HelloFut = Ready<String>;
//!
//! fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! future::ready(format!("Hello, {}!", name))
//! future::ready(format!("Hello, {name}!"))
//! }
//! }
//! ```
@@ -168,7 +168,7 @@
//! # // an associated type representing the future output by the fn.
//! # type HelloFut = Ready<String>;
//! # fn hello(self, _: context::Context, name: String) -> Self::HelloFut {
//! # future::ready(format!("Hello, {}!", name))
//! # future::ready(format!("Hello, {name}!"))
//! # }
//! # }
//! # #[cfg(not(feature = "tokio1"))]
@@ -190,7 +190,7 @@
//! // specifies a deadline and trace information which can be helpful in debugging requests.
//! let hello = client.hello(context::current(), "Stim".to_string()).await?;
//!
//! println!("{}", hello);
//! println!("{hello}");
//!
//! Ok(())
//! }
@@ -264,7 +264,7 @@ pub use tarpc_plugins::service;
/// #[tarpc::server]
/// impl World for HelloServer {
/// async fn hello(self, _: context::Context, name: String) -> String {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// format!("Hello, {name}! You are connected from {:?}.", self.0)
/// }
/// }
/// ```
@@ -290,7 +290,7 @@ pub use tarpc_plugins::service;
/// fn hello(self, _: context::Context, name: String) -> Pin<Box<dyn Future<Output = String>
/// + Send>> {
/// Box::pin(async move {
/// format!("Hello, {}! You are connected from {:?}.", name, self.0)
/// format!("Hello, {name}! You are connected from {:?}.", self.0)
/// })
/// }
/// }

View File

@@ -161,6 +161,7 @@ where
let span = info_span!(
"RPC",
rpc.trace_id = %request.context.trace_id(),
rpc.deadline = %humantime::format_rfc3339(request.context.deadline),
otel.kind = "server",
otel.name = tracing::field::Empty,
);

View File

@@ -138,25 +138,25 @@ impl From<u64> for SpanId {
impl From<opentelemetry::trace::TraceId> for TraceId {
fn from(trace_id: opentelemetry::trace::TraceId) -> Self {
Self::from(trace_id.to_u128())
Self::from(u128::from_be_bytes(trace_id.to_bytes()))
}
}
impl From<TraceId> for opentelemetry::trace::TraceId {
fn from(trace_id: TraceId) -> Self {
Self::from_u128(trace_id.into())
Self::from_bytes(u128::from(trace_id).to_be_bytes())
}
}
impl From<opentelemetry::trace::SpanId> for SpanId {
fn from(span_id: opentelemetry::trace::SpanId) -> Self {
Self::from(span_id.to_u64())
Self::from(u64::from_be_bytes(span_id.to_bytes()))
}
}
impl From<SpanId> for opentelemetry::trace::SpanId {
fn from(span_id: SpanId) -> Self {
Self::from_u64(span_id.0)
Self::from_bytes(u64::from(span_id).to_be_bytes())
}
}

View File

@@ -181,7 +181,7 @@ mod tests {
future::ready(request.parse::<u64>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("{:?} is not an int", request),
format!("{request:?} is not an int"),
)
}))
}),

View File

@@ -38,11 +38,34 @@ where
H: BuildHasher,
{
fn compact(&mut self, usage_ratio_threshold: f64) {
if self.capacity() > 1000 {
let usage_ratio = self.len() as f64 / self.capacity() as f64;
if usage_ratio < usage_ratio_threshold {
self.shrink_to_fit();
}
}
let usage_ratio_threshold = usage_ratio_threshold.clamp(f64::MIN_POSITIVE, 1.);
let cap = f64::max(1000., self.len() as f64 / usage_ratio_threshold);
self.shrink_to(cap as usize);
}
}
#[test]
fn test_compact() {
let mut map = HashMap::with_capacity(2048);
assert_eq!(map.capacity(), 3584);
// Make usage ratio 25%
for i in 0..896 {
map.insert(format!("k{i}"), "v");
}
map.compact(-1.0);
assert_eq!(map.capacity(), 3584);
map.compact(0.25);
assert_eq!(map.capacity(), 3584);
map.compact(0.50);
assert_eq!(map.capacity(), 1792);
map.compact(1.0);
assert_eq!(map.capacity(), 1792);
map.compact(2.0);
assert_eq!(map.capacity(), 1792);
}

View File

@@ -7,8 +7,8 @@ struct HelloServer;
#[tarpc::server]
impl World for HelloServer {
fn hello(name: String) -> String {
format!("Hello, {}!", name)
fn hello(name: String) -> String {
format!("Hello, {name}!", name)
}
}

View File

@@ -7,5 +7,5 @@ error: not all trait items implemented, missing: `HelloFut`
error: hint: `#[tarpc::server]` only rewrites async fns, and `fn hello` is not async
--> $DIR/tarpc_server_missing_async.rs:10:5
|
10 | fn hello(name: String) -> String {
10 | fn hello(name: String) -> String {
| ^^

View File

@@ -31,7 +31,7 @@ impl Service for Server {
type HeyFut = Ready<String>;
fn hey(self, _: context::Context, name: String) -> Self::HeyFut {
ready(format!("Hey, {}.", name))
ready(format!("Hey, {name}."))
}
}